Coverage for python / lsst / pipe / base / _task_metadata.py: 17%
220 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-06 08:32 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <https://www.gnu.org/licenses/>.
28__all__ = [
29 "GetDictMetadata",
30 "GetSetDictMetadata",
31 "NestedMetadataDict",
32 "SetDictMetadata",
33 "TaskMetadata",
34]
36import itertools
37import numbers
38import sys
39from collections.abc import Collection, Iterator, Mapping, Sequence
40from typing import Annotated, Any, Protocol
42from pydantic import (
43 BaseModel,
44 BeforeValidator,
45 ConfigDict,
46 Field,
47 StrictBool,
48 StrictFloat,
49 StrictInt,
50 StrictStr,
51)
53# The types allowed in a Task metadata field are restricted
54# to allow predictable serialization.
55_ALLOWED_PRIMITIVE_TYPES = (str, float, int, bool)
57# Note that '|' syntax for unions doesn't work when we have to use a string
58# literal (and we do since it's recursive and not an annotation).
59type NestedMetadataDict = Mapping[str, str | float | int | bool | "NestedMetadataDict"]
62class PropertySetLike(Protocol):
63 """Protocol that looks like a ``lsst.daf.base.PropertySet``.
65 Enough of the API is specified to support conversion of a
66 ``PropertySet`` to a `TaskMetadata`.
67 """
69 def paramNames(self, topLevelOnly: bool = True) -> Collection[str]: ... 69 ↛ exitline 69 didn't return from function 'paramNames' because
71 def getArray(self, name: str) -> Any: ... 71 ↛ exitline 71 didn't return from function 'getArray' because
74def _isListLike(v: Any) -> bool:
75 return isinstance(v, Sequence) and not isinstance(v, str)
78class SetDictMetadata(Protocol):
79 """Protocol for objects that can be assigned a possibly-nested `dict` of
80 primitives.
82 This protocol is satisfied by `TaskMetadata`, `lsst.daf.base.PropertySet`,
83 and `lsst.daf.base.PropertyList`, providing a consistent way to insert a
84 dictionary into these objects that avoids their historical idiosyncrasies.
86 The form in which these entries appear in the object's native keys and
87 values is implementation-defined. *Empty nested dictionaries may be
88 dropped, and if the top-level dictionary is empty this method may do
89 nothing.*
91 Neither the top-level key nor nested keys may contain ``.`` (period)
92 characters.
93 """
95 def set_dict(self, key: str, nested: NestedMetadataDict) -> None: ... 95 ↛ exitline 95 didn't return from function 'set_dict' because
98class GetDictMetadata(Protocol):
99 """Protocol for objects that can extract a possibly-nested mapping of
100 primitives.
102 This protocol is satisfied by `TaskMetadata`, `lsst.daf.base.PropertySet`,
103 and `lsst.daf.base.PropertyList`, providing a consistent way to extract a
104 dictionary from these objects that avoids their historical idiosyncrasies.
106 This is guaranteed to work for mappings inserted by
107 `~SetMapping.set_dict`. It should not be expected to work for values
108 inserted in other ways. If a value was never inserted with the given key
109 at all, *an empty `dict` will be returned* (this is a concession to
110 implementation constraints in `~lsst.daf.base.PropertyList`.
111 """
113 def get_dict(self, key: str) -> NestedMetadataDict: ... 113 ↛ exitline 113 didn't return from function 'get_dict' because
116class GetSetDictMetadata(SetDictMetadata, GetDictMetadata, Protocol):
117 """Protocol for objects that can assign and extract a possibly-nested
118 mapping of primitives.
119 """
122# Some TaskMetadata JSON representations have been written (provenance files
123# for the DP2 production) with NaNs converted to JSON null rather than a
124# constant "inf" or "nan", since the Pydantic model_config ser_json_inf_nan
125# doesn't automatically get picked up by parent models. This before-validator
126# turns those nulls back into NaNs, making the metadata readable again.
127def _convert_null_to_nan(value: Any) -> float:
128 if value is None:
129 return float("nan")
130 return float(value)
133type _NullToNanFloat = Annotated[StrictFloat, BeforeValidator(_convert_null_to_nan)]
136class TaskMetadata(BaseModel):
137 """Dict-like object for storing task metadata.
139 Metadata can be stored at two levels: single task or task plus subtasks.
140 The later is called full metadata of a task and has a form
142 topLevelTaskName:subtaskName:subsubtaskName.itemName
144 Metadata item key of a task (`itemName` above) must not contain `.`,
145 which serves as a separator in full metadata keys and turns
146 the value into sub-dictionary. Arbitrary hierarchies are supported.
147 """
149 # Pipelines regularly generate NaN and Inf so these need to be
150 # supported even though that's a JSON extension. Note that any parent
151 # models that might hold a TaskMetadata also need to set this explicitly!
152 model_config = ConfigDict(ser_json_inf_nan="constants")
154 scalars: dict[str, _NullToNanFloat | StrictInt | StrictBool | StrictStr] = Field(default_factory=dict)
155 arrays: dict[str, list[_NullToNanFloat] | list[StrictInt] | list[StrictBool] | list[StrictStr]] = Field(
156 default_factory=dict
157 )
158 metadata: dict[str, "TaskMetadata"] = Field(default_factory=dict)
160 @classmethod
161 def from_dict(cls, d: Mapping[str, Any]) -> "TaskMetadata":
162 """Create a TaskMetadata from a dictionary.
164 Parameters
165 ----------
166 d : `~collections.abc.Mapping`
167 Mapping to convert. Can be hierarchical. Any dictionaries
168 in the hierarchy are converted to `TaskMetadata`.
170 Returns
171 -------
172 meta : `TaskMetadata`
173 Newly-constructed metadata.
174 """
175 metadata = cls()
176 for k, v in d.items():
177 metadata[k] = v
178 return metadata
180 @classmethod
181 def from_metadata(cls, ps: PropertySetLike) -> "TaskMetadata":
182 """Create a TaskMetadata from a PropertySet-like object.
184 Parameters
185 ----------
186 ps : `PropertySetLike` or `TaskMetadata`
187 A ``PropertySet``-like object to be transformed to a
188 `TaskMetadata`. A `TaskMetadata` can be copied using this
189 class method.
191 Returns
192 -------
193 tm : `TaskMetadata`
194 Newly-constructed metadata.
196 Notes
197 -----
198 Items stored in single-element arrays in the supplied object
199 will be converted to scalars in the newly-created object.
200 """
201 # Use hierarchical names to assign values from input to output.
202 # This API exists for both PropertySet and TaskMetadata.
203 # from_dict() does not work because PropertySet is not declared
204 # to be a Mapping.
205 # PropertySet.toDict() is not present in TaskMetadata so is best
206 # avoided.
207 metadata = cls()
208 for key in sorted(ps.paramNames(topLevelOnly=False)):
209 value = ps.getArray(key)
210 if len(value) == 1:
211 value = value[0]
212 metadata[key] = value
213 return metadata
215 def to_dict(self) -> dict[str, Any]:
216 """Convert the class to a simple dictionary.
218 Returns
219 -------
220 d : `dict`
221 Simple dictionary that can contain scalar values, array values
222 or other dictionary values.
224 Notes
225 -----
226 Unlike `dict()`, this method hides the model layout and combines
227 scalars, arrays, and other metadata in the same dictionary. Can be
228 used when a simple dictionary is needed. Use
229 `TaskMetadata.from_dict()` to convert it back.
230 """
231 d: dict[str, Any] = {}
232 d.update(self.scalars)
233 d.update(self.arrays)
234 for k, v in self.metadata.items():
235 d[k] = v.to_dict()
236 return d
238 def add(self, name: str, value: Any) -> None:
239 """Store a new value, adding to a list if one already exists.
241 Parameters
242 ----------
243 name : `str`
244 Name of the metadata property.
245 value : `~typing.Any`
246 Metadata property value.
247 """
248 keys = self._getKeys(name)
249 key0 = keys.pop(0)
250 if len(keys) == 0:
251 # If add() is being used, always store the value in the arrays
252 # property as a list. It's likely there will be another call.
253 slot_type, value = self._validate_value(value)
254 if slot_type == "array":
255 pass
256 elif slot_type == "scalar":
257 value = [value]
258 else:
259 raise ValueError("add() can only be used for primitive types or sequences of those types.")
261 if key0 in self.metadata:
262 raise ValueError(f"Can not add() to key '{name}' since that is a TaskMetadata")
264 if key0 in self.scalars:
265 # Convert scalar to array.
266 # MyPy should be able to figure out that List[Union[T1, T2]] is
267 # compatible with Union[List[T1], List[T2]] if the list has
268 # only one element, but it can't.
269 self.arrays[key0] = [self.scalars.pop(key0)] # type: ignore
271 if key0 in self.arrays:
272 # Check that the type is not changing.
273 if (curtype := type(self.arrays[key0][0])) is not (newtype := type(value[0])):
274 raise ValueError(f"Type mismatch in add() -- currently {curtype} but adding {newtype}")
275 self.arrays[key0].extend(value)
276 else:
277 self.arrays[key0] = value
279 return
281 self.metadata[key0].add(".".join(keys), value)
283 def getScalar(self, key: str) -> str | int | float | bool:
284 """Retrieve a scalar item even if the item is a list.
286 Parameters
287 ----------
288 key : `str`
289 Item to retrieve.
291 Returns
292 -------
293 value : `str`, `int`, `float`, or `bool`
294 Either the value associated with the key or, if the key
295 corresponds to a list, the last item in the list.
297 Raises
298 ------
299 KeyError
300 Raised if the item is not found.
301 """
302 # Used in pipe_tasks.
303 # getScalar() is the default behavior for __getitem__.
304 return self[key]
306 def getArray(self, key: str) -> list[Any]:
307 """Retrieve an item as a list even if it is a scalar.
309 Parameters
310 ----------
311 key : `str`
312 Item to retrieve.
314 Returns
315 -------
316 values : `list` of any
317 A list containing the value or values associated with this item.
319 Raises
320 ------
321 KeyError
322 Raised if the item is not found.
323 """
324 keys = self._getKeys(key)
325 key0 = keys.pop(0)
326 if len(keys) == 0:
327 if key0 in self.arrays:
328 return self.arrays[key0]
329 elif key0 in self.scalars:
330 return [self.scalars[key0]]
331 elif key0 in self.metadata:
332 return [self.metadata[key0]]
333 raise KeyError(f"'{key}' not found")
335 try:
336 return self.metadata[key0].getArray(".".join(keys))
337 except KeyError:
338 # Report the correct key.
339 raise KeyError(f"'{key}' not found") from None
341 def names(self) -> set[str]:
342 """Return the hierarchical keys from the metadata.
344 Returns
345 -------
346 names : `collections.abc.Set`
347 A set of all keys, including those from the hierarchy and the
348 top-level hierarchy.
349 """
350 names = set()
351 for k, v in self.items():
352 names.add(k) # Always include the current level
353 if isinstance(v, TaskMetadata):
354 names.update({k + "." + item for item in v.names()})
355 return names
357 def paramNames(self, topLevelOnly: bool) -> set[str]:
358 """Return hierarchical names.
360 Parameters
361 ----------
362 topLevelOnly : `bool`
363 Control whether only top-level items are returned or items
364 from the hierarchy.
366 Returns
367 -------
368 paramNames : `set` of `str`
369 If ``topLevelOnly`` is `True`, returns any keys that are not
370 part of a hierarchy. If `False` also returns fully-qualified
371 names from the hierarchy. Keys associated with the top
372 of a hierarchy are never returned.
373 """
374 # Currently used by the verify package.
375 paramNames = set()
376 for k, v in self.items():
377 if isinstance(v, TaskMetadata):
378 if not topLevelOnly:
379 paramNames.update({k + "." + item for item in v.paramNames(topLevelOnly=topLevelOnly)})
380 else:
381 paramNames.add(k)
382 return paramNames
384 @staticmethod
385 def _getKeys(key: str) -> list[str]:
386 """Return the key hierarchy.
388 Parameters
389 ----------
390 key : `str`
391 The key to analyze. Can be dot-separated.
393 Returns
394 -------
395 keys : `list` of `str`
396 The key hierarchy that has been split on ``.``.
398 Raises
399 ------
400 KeyError
401 Raised if the key is not a string.
402 """
403 try:
404 keys = key.split(".")
405 except Exception:
406 raise KeyError(f"Invalid key '{key}': only string keys are allowed") from None
407 return keys
409 def keys(self) -> tuple[str, ...]:
410 """Return the top-level keys."""
411 return tuple(k for k in self)
413 def items(self) -> Iterator[tuple[str, Any]]:
414 """Yield the top-level keys and values."""
415 yield from itertools.chain(self.scalars.items(), self.arrays.items(), self.metadata.items())
417 def __len__(self) -> int:
418 """Return the number of items."""
419 return len(self.scalars) + len(self.arrays) + len(self.metadata)
421 # This is actually a Liskov substitution violation, because
422 # pydantic.BaseModel says __iter__ should return something else. But the
423 # pydantic docs say to do exactly this to in order to make a mapping-like
424 # BaseModel, so that's what we do.
425 def __iter__(self) -> Iterator[str]: # type: ignore
426 """Return an iterator over each key."""
427 # The order of keys is not preserved since items can move
428 # from scalar to array.
429 return itertools.chain(iter(self.scalars), iter(self.arrays), iter(self.metadata))
431 def __getitem__(self, key: str) -> Any:
432 """Retrieve the item associated with the key.
434 Parameters
435 ----------
436 key : `str`
437 The key to retrieve. Can be dot-separated hierarchical.
439 Returns
440 -------
441 value : `TaskMetadata`, `float`, `int`, `bool`, `str`
442 A scalar value. For compatibility with ``PropertySet``, if the key
443 refers to an array, the final element is returned and not the
444 array itself.
446 Raises
447 ------
448 KeyError
449 Raised if the item is not found.
450 """
451 keys = self._getKeys(key)
452 key0 = keys.pop(0)
453 if len(keys) == 0:
454 if key0 in self.scalars:
455 return self.scalars[key0]
456 if key0 in self.metadata:
457 return self.metadata[key0]
458 if key0 in self.arrays:
459 arr = self.arrays[key0]
460 if not arr:
461 # If there are no elements then returning a scalar
462 # is an error.
463 raise KeyError(f"'{key}' not found")
464 return arr[-1]
465 raise KeyError(f"'{key}' not found")
466 # Hierarchical lookup so the top key can only be in the metadata
467 # property. Trap KeyError and reraise so that the correct key
468 # in the hierarchy is reported.
469 try:
470 # And forward request to that metadata.
471 return self.metadata[key0][".".join(keys)]
472 except KeyError:
473 raise KeyError(f"'{key}' not found") from None
475 def get(self, key: str, default: Any = None) -> Any:
476 """Retrieve the item associated with the key or a default.
478 Parameters
479 ----------
480 key : `str`
481 The key to retrieve. Can be dot-separated hierarchical.
482 default : `~typing.Any`
483 The value to return if the key does not exist.
485 Returns
486 -------
487 value : `TaskMetadata`, `float`, `int`, `bool`, `str`
488 A scalar value. If the key refers to an array, the final element
489 is returned and not the array itself; this is consistent with
490 `__getitem__` and `PropertySet.get`, but not ``to_dict().get``.
491 """
492 try:
493 return self[key]
494 except KeyError:
495 return default
497 def __setitem__(self, key: str, item: Any) -> None:
498 """Store the given item."""
499 keys = self._getKeys(key)
500 key0 = keys.pop(0)
501 if len(keys) == 0:
502 slots: dict[str, dict[str, Any]] = {
503 "array": self.arrays,
504 "scalar": self.scalars,
505 "metadata": self.metadata,
506 }
507 primary: dict[str, Any] | None = None
508 slot_type, item = self._validate_value(item)
509 primary = slots.pop(slot_type, None)
510 if primary is None:
511 raise AssertionError(f"Unknown slot type returned from validator: {slot_type}")
513 # Assign the value to the right place.
514 primary[key0] = item
515 for property in slots.values():
516 # Remove any other entries.
517 property.pop(key0, None)
518 return
520 # This must be hierarchical so forward to the child TaskMetadata.
521 if key0 not in self.metadata:
522 self.metadata[key0] = TaskMetadata()
523 self.metadata[key0][".".join(keys)] = item
525 # Ensure we have cleared out anything with the same name elsewhere.
526 self.scalars.pop(key0, None)
527 self.arrays.pop(key0, None)
529 def __contains__(self, key: str) -> bool:
530 """Determine if the key exists."""
531 keys = self._getKeys(key)
532 key0 = keys.pop(0)
533 if len(keys) == 0:
534 return key0 in self.scalars or key0 in self.arrays or key0 in self.metadata
536 if key0 in self.metadata:
537 return ".".join(keys) in self.metadata[key0]
538 return False
540 def __delitem__(self, key: str) -> None:
541 """Remove the specified item.
543 Raises
544 ------
545 KeyError
546 Raised if the item is not present.
547 """
548 keys = self._getKeys(key)
549 key0 = keys.pop(0)
550 if len(keys) == 0:
551 # MyPy can't figure out that this way to combine the types in the
552 # tuple is the one that matters, and annotating a local variable
553 # helps it out.
554 properties: tuple[dict[str, Any], ...] = (self.scalars, self.arrays, self.metadata)
555 for property in properties:
556 if key0 in property:
557 del property[key0]
558 return
559 raise KeyError(f"'{key}' not found'")
561 try:
562 del self.metadata[key0][".".join(keys)]
563 except KeyError:
564 # Report the correct key.
565 raise KeyError(f"'{key}' not found'") from None
567 def get_dict(self, key: str) -> NestedMetadataDict:
568 """Return a possibly-hierarchical nested `dict`.
570 This implements the `GetDictMetadata` protocol for consistency with
571 `lsst.daf.base.PropertySet` and `lsst.daf.base.PropertyList`. The
572 returned `dict` is guaranteed to be a deep copy, not a view.
574 Parameters
575 ----------
576 key : `str`
577 String key associated with the mapping. May not have a ``.``
578 character.
580 Returns
581 -------
582 value : `~collections.abc.Mapping`
583 Possibly-nested mapping, with `str` keys and values that are `int`,
584 `float`, `str`, `bool`, or another `dict` with the same key and
585 value types. Will be empty if ``key`` does not exist.
586 """
587 if value := self.get(key):
588 return value.to_dict()
589 else:
590 return {}
592 def set_dict(self, key: str, value: NestedMetadataDict) -> None:
593 """Assign a possibly-hierarchical nested `dict`.
595 This implements the `SetDictMetadata` protocol for consistency with
596 `lsst.daf.base.PropertySet` and `lsst.daf.base.PropertyList`.
598 Parameters
599 ----------
600 key : `str`
601 String key associated with the mapping. May not have a ``.``
602 character.
603 value : `~collections.abc.Mapping`
604 Possibly-nested mapping, with `str` keys and values that are `int`,
605 `float`, `str`, `bool`, or another `dict` with the same key and
606 value types. Nested keys may not have a ``.`` character.
607 """
608 self[key] = value
610 def _validate_value(self, value: Any) -> tuple[str, Any]:
611 """Validate the given value.
613 Parameters
614 ----------
615 value : Any
616 Value to check.
618 Returns
619 -------
620 slot_type : `str`
621 The type of value given. Options are "scalar", "array", "metadata".
622 item : Any
623 The item that was given but possibly modified to conform to
624 the slot type.
626 Raises
627 ------
628 ValueError
629 Raised if the value is not a recognized type.
630 """
631 # Test the simplest option first.
632 value_type = type(value)
633 if value_type in _ALLOWED_PRIMITIVE_TYPES:
634 return "scalar", value
636 if isinstance(value, TaskMetadata):
637 return "metadata", value
638 if isinstance(value, Mapping):
639 return "metadata", self.from_dict(value)
641 if _isListLike(value):
642 # For model consistency, need to check that every item in the
643 # list has the same type.
644 value = list(value)
645 if not value:
646 return "array", value
648 type0 = type(value[0])
649 for i in value:
650 if type(i) is not type0:
651 raise ValueError(
652 "Type mismatch in supplied list. TaskMetadata requires all"
653 f" elements have same type but see {type(i)} and {type0}."
654 )
656 if type0 not in _ALLOWED_PRIMITIVE_TYPES:
657 # Must check to see if we got numpy floats or something.
658 type_cast: type
659 if isinstance(value[0], numbers.Integral):
660 type_cast = int
661 elif isinstance(value[0], numbers.Real):
662 type_cast = float
663 else:
664 raise ValueError(
665 f"Supplied list has element of type '{type0}'. "
666 "TaskMetadata can only accept primitive types in lists."
667 )
669 value = [type_cast(v) for v in value]
671 return "array", value
673 # Sometimes a numpy number is given.
674 if isinstance(value, numbers.Integral):
675 value = int(value)
676 return "scalar", value
677 if isinstance(value, numbers.Real):
678 value = float(value)
679 return "scalar", value
681 raise ValueError(f"TaskMetadata does not support values of type {value!r}.")
683 # Work around the fact that Sphinx chokes on Pydantic docstring formatting,
684 # when we inherit those docstrings in our public classes.
685 if "sphinx" in sys.modules:
687 def copy(self, *args: Any, **kwargs: Any) -> Any:
688 """See `pydantic.BaseModel.copy`."""
689 return super().copy(*args, **kwargs)
691 def model_dump(self, *args: Any, **kwargs: Any) -> Any:
692 """See `pydantic.BaseModel.model_dump`."""
693 return super().model_dump(*args, **kwargs)
695 def model_dump_json(self, *args: Any, **kwargs: Any) -> Any:
696 """See `pydantic.BaseModel.model_dump_json`."""
697 return super().model_dump(*args, **kwargs)
699 def model_copy(self, *args: Any, **kwargs: Any) -> Any:
700 """See `pydantic.BaseModel.model_copy`."""
701 return super().model_copy(*args, **kwargs)
703 @classmethod
704 def model_construct(cls, *args: Any, **kwargs: Any) -> Any: # type: ignore[misc, override]
705 """See `pydantic.BaseModel.model_construct`."""
706 return super().model_construct(*args, **kwargs)
708 @classmethod
709 def model_json_schema(cls, *args: Any, **kwargs: Any) -> Any:
710 """See `pydantic.BaseModel.model_json_schema`."""
711 return super().model_json_schema(*args, **kwargs)
713 @classmethod
714 def model_validate(cls, *args: Any, **kwargs: Any) -> Any:
715 """See `pydantic.BaseModel.model_validate`."""
716 return super().model_validate(*args, **kwargs)
718 @classmethod
719 def model_validate_json(cls, *args: Any, **kwargs: Any) -> Any:
720 """See `pydantic.BaseModel.model_validate_json`."""
721 return super().model_validate_json(*args, **kwargs)
723 @classmethod
724 def model_validate_strings(cls, *args: Any, **kwargs: Any) -> Any:
725 """See `pydantic.BaseModel.model_validate_strings`."""
726 return super().model_validate_strings(*args, **kwargs)
729# Needed because a TaskMetadata can contain a TaskMetadata.
730TaskMetadata.model_rebuild()