Coverage for python / lsst / daf / butler / _dataset_provenance.py: 11%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-26 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ("DatasetProvenance",) 

31 

32import re 

33import uuid 

34from typing import TYPE_CHECKING, Any, Self, TypeAlias 

35 

36import pydantic 

37 

38from ._dataset_ref import DatasetRef, SerializedDatasetRef 

39from .dimensions import DataIdValue 

40 

41if TYPE_CHECKING: 

42 from collections.abc import Mapping, MutableMapping 

43 

44 from ._butler import Butler 

45 

46# Types that can be stored in a provenance dictionary. 

47_PROV_TYPES: TypeAlias = str | int | float | bool | DataIdValue | uuid.UUID 

48 

49 

50class DatasetProvenance(pydantic.BaseModel): 

51 """Provenance of a single `DatasetRef`.""" 

52 

53 inputs: list[SerializedDatasetRef] = pydantic.Field(default_factory=list) 

54 """The input datasets.""" 

55 quantum_id: uuid.UUID | None = None 

56 """Identifier of the Quantum that was executed.""" 

57 extras: dict[uuid.UUID, dict[str, _PROV_TYPES]] = pydantic.Field(default_factory=dict) 

58 """Extra provenance information associated with a particular dataset.""" 

59 _uuids: set[uuid.UUID] = pydantic.PrivateAttr(default_factory=set) 

60 

61 @pydantic.model_validator(mode="after") 

62 def populate_cache(self) -> Self: 

63 for ref in self.inputs: 

64 self._uuids.add(ref.id) 

65 return self 

66 

67 def add_input(self, ref: DatasetRef) -> None: 

68 """Add an input dataset to the provenance. 

69 

70 Parameters 

71 ---------- 

72 ref : `DatasetRef` 

73 A dataset to register as an input. 

74 """ 

75 if ref.id in self._uuids: 

76 # Already registered. 

77 return 

78 self._uuids.add(ref.id) 

79 self.inputs.append(ref.to_simple()) 

80 

81 def add_extra_provenance(self, dataset_id: uuid.UUID, extra: Mapping[str, _PROV_TYPES]) -> None: 

82 """Attach extra provenance to a specific dataset. 

83 

84 Parameters 

85 ---------- 

86 dataset_id : `uuid.UUID` 

87 The ID of the dataset to receive this provenance. 

88 extra : `~collections.abc.Mapping` [ `str`, `typing.Any` ] 

89 The extra provenance information as a dictionary. The values 

90 must be simple Python scalars or scalars that can be serialized 

91 by Pydantic and convert to a simple string value. 

92 

93 Notes 

94 ----- 

95 The keys in the extra provenance can not include provenance keys of 

96 ``run``, ``id``, or ``datasettype`` (in upper or lower case). 

97 """ 

98 if dataset_id not in self._uuids: 

99 raise ValueError(f"The given dataset ID {dataset_id} is not known to this provenance instance.") 

100 extra_keys = {k.lower() for k in extra.keys()} 

101 if overlap := (extra_keys & {"run", "id", "datasettype"}): 

102 raise ValueError(f"Extra provenance includes a reserved provenance key: {overlap}") 

103 

104 self.extras.setdefault(dataset_id, {}).update(extra) 

105 

106 def to_flat_dict( 

107 self, 

108 ref: DatasetRef | None, 

109 /, 

110 *, 

111 prefix: str = "", 

112 sep: str = ".", 

113 simple_types: bool = False, 

114 use_upper: bool | None = None, 

115 max_inputs: int | None = None, 

116 store_minimalist_inputs: bool = False, 

117 ) -> dict[str, _PROV_TYPES]: 

118 """Return provenance as a flattened dictionary. 

119 

120 Parameters 

121 ---------- 

122 ref : `DatasetRef` or `None` 

123 If given, a dataset for which this provenance is relevant and 

124 should be included. 

125 prefix : `str`, optional 

126 A prefix to use for each key in the provenance dictionary. 

127 sep : `str`, optional 

128 Separator to use to represent hierarchy. Must be a single 

129 character. Can not be a number, letter, or underscore (to avoid 

130 confusion with provenance keys themselves). 

131 simple_types : `bool`, optional 

132 If `True` only simple Python types will be used in the returned 

133 dictionary, specifically UUIDs will be returned as `str`. If 

134 `False`, UUIDs will be returned as `uuid.UUID`. Complex types 

135 found in `DatasetProvenance.extras` will be cast to a `str` 

136 if `True`. 

137 use_upper : `bool` or `None`, optional 

138 If `None` the case of the keys matches the case of the first 

139 character of the prefix (defined by whether `str.isupper` returns 

140 true, else they will be lower case). If `False` the case will be 

141 lower case, and if `True` the case will be upper case. 

142 max_inputs : `int` or `None`, optional 

143 Maximum number of inputs to be recorded in provenance. `None` 

144 results in all inputs being recorded. If the number of inputs 

145 exceeds this value no input provenance will be recorded. 

146 store_minimalist_inputs : `bool`, optional 

147 If `True` only the ID of the input is stored along with explicit 

148 extras. If `False` the run and dataset type are also recorded. 

149 

150 Returns 

151 ------- 

152 prov : `dict` 

153 Dictionary representing the provenance. The keys are defined 

154 in the notes below. 

155 

156 Notes 

157 ----- 

158 Keys from the given dataset (all optional if no dataset is given): 

159 

160 :id: UUID of the given dataset. 

161 :run: Run of the given dataset. 

162 :datasettype: Dataset type of the given dataset. 

163 :dataid x: An entry for each required dimension, "x", in the data ID. 

164 

165 Each input dataset will have the ``id``, ``run``, and ``datasettype`` 

166 keys as defined above (but no ``dataid`` key) with an ``input N`` 

167 prefix where ``N`` starts counting at 0. It is possible to drop 

168 the ``datasettype`` and ``run`` to save space by using the 

169 ``store_minimalist_inputs`` flag. 

170 

171 If there are too many inputs (see the ``max_inputs`` parameters) 

172 no inputs will be recorded. The number of inputs is always recorded 

173 to indicate that the inputs were dropped. 

174 

175 The quantum ID, if present, will use key ``quantum``. 

176 

177 Examples 

178 -------- 

179 >>> provenance.to_flat_dict( 

180 ... ref, prefix="lsst.butler", sep=".", simple_types=True 

181 ... ) 

182 { 

183 "lsst.butler.id": "ae0fa83d-cc89-41dd-9680-f97ede49f01e", 

184 "lsst.butler.run": "test_run", 

185 "lsst.butler.datasettype": "data", 

186 "lsst.butler.dataid.detector": 10, 

187 "lsst.butler.dataid.instrument": "LSSTCam", 

188 "lsst.butler.quantum": "d93a735b-08f0-477d-bc95-2cc32d6d898b", 

189 "lsst.butler.n_inputs": 2, 

190 "lsst.butler.input.0.id": "3dfd7ba5-5e35-4565-9d87-4b33880ed06c", 

191 "lsst.butler.input.0.run": "other_run", 

192 "lsst.butler.input.0.datasettype": "astropy_parquet", 

193 "lsst.butler.input.1.id": "7a99f6e9-4035-3d68-842e-58ecce1dc935", 

194 "lsst.butler.input.1.run": "other_run", 

195 "lsst.butler.input.1.datasettype": "astropy_parquet", 

196 } 

197 

198 Raises 

199 ------ 

200 ValueError 

201 Raised if the separator is not a single character. 

202 """ 

203 if len(sep) != 1: 

204 raise ValueError(f"Separator for provenance keys must be a single character. Got {sep!r}.") 

205 if re.match(r"[_\w\d]$", sep): 

206 raise ValueError( 

207 f"Separator for provenance keys can not be word character or underscore. Got {sep!r}." 

208 ) 

209 

210 def _make_key(*keys: str | int) -> str: 

211 """Make the key in the correct form with simpler API.""" 

212 return self._make_provenance_key(prefix, sep, use_upper, *keys) 

213 

214 prov: dict[str, _PROV_TYPES] = {} 

215 if ref is not None: 

216 prov[_make_key("id")] = ref.id if not simple_types else str(ref.id) 

217 prov[_make_key("run")] = ref.run 

218 prov[_make_key("datasettype")] = ref.datasetType.name 

219 for k, v in sorted(ref.dataId.required.items()): 

220 prov[_make_key("dataid", k)] = v 

221 

222 if self.quantum_id is not None: 

223 prov[_make_key("quantum")] = self.quantum_id if not simple_types else str(self.quantum_id) 

224 

225 # Record the number of inputs so that people can determine how many 

226 # there were even if they were dropped because they exceeded the 

227 # allowed maximum. Do not record the count if we have a null provenance 

228 # state with no ref and no inputs. 

229 if ref is not None or len(self.inputs) > 0: 

230 prov[_make_key("n_inputs")] = len(self.inputs) 

231 

232 # Remove all inputs if the maximum is found. Truncating to the 

233 # maximum (or auto switching to minimalist mode and increasing the 

234 # maximum by 3) is not preferred. 

235 inputs = ( 

236 self.inputs 

237 if max_inputs is None or (max_inputs is not None and len(self.inputs) <= max_inputs) 

238 else [] 

239 ) 

240 for i, input in enumerate(inputs): 

241 prov[_make_key("input", i, "id")] = input.id if not simple_types else str(input.id) 

242 if not store_minimalist_inputs: 

243 if input.run is not None: # for mypy 

244 prov[_make_key("input", i, "run")] = input.run 

245 if input.datasetType is not None: # for mypy 

246 prov[_make_key("input", i, "datasettype")] = input.datasetType.name 

247 

248 if input.id in self.extras: 

249 for xk, xv in self.extras[input.id].items(): 

250 if simple_types and not isinstance(xv, str | float | int | bool): 

251 xv = str(xv) 

252 prov[_make_key("input", i, xk)] = xv 

253 

254 return prov 

255 

256 @staticmethod 

257 def _make_provenance_key(prefix: str, sep: str, use_upper: bool | None, *keys: str | int) -> str: 

258 """Construct provenance key from prefix and separator. 

259 

260 Parameters 

261 ---------- 

262 prefix : `str` 

263 A prefix to use for each key in the provenance dictionary. 

264 sep : `str` 

265 Separator to use to represent hierarchy. Must be a single 

266 character. 

267 use_upper : `bool` or `None` 

268 If `True` use upper case for provenance keys, if `False` use lower 

269 case, if `None` match the case of the prefix. 

270 *keys : `tuple` of `str` | `int` 

271 Components of key to combine with prefix and separator. 

272 

273 Returns 

274 ------- 

275 key : `str` 

276 Key to use in dictionary. Case of result will match case of 

277 prefix (defaulting to lower case if the first character of 

278 prefix has no case). 

279 """ 

280 if use_upper is None: 

281 use_upper = prefix[0].isupper() if prefix else False 

282 if prefix: 

283 prefix += sep 

284 k = sep.join(str(kk) for kk in keys) 

285 if use_upper: 

286 k = k.upper() 

287 return f"{prefix}{k}" 

288 

289 @staticmethod 

290 def _find_prefix_and_sep(prov_dict: Mapping[str, Any]) -> tuple[str, str] | tuple[None, None]: 

291 """Given a mapping try to determine the prefix and separator for 

292 provenance keys. 

293 

294 Parameters 

295 ---------- 

296 prov_dict : `collections.abc.Mapping` 

297 Mapping to scan. Assumed to include keys populated by 

298 `to_flat_dict`. 

299 

300 Returns 

301 ------- 

302 prefix : `str` 

303 Prefix given to `to_flat_dict`. `None` if no provenance headers 

304 were found. 

305 sep : `str` 

306 Separator given to `to_flat_dict`. `None` if no provenance headers 

307 were found. 

308 

309 Raises 

310 ------ 

311 ValueError 

312 Raised if more than one value was found for either the prefix or 

313 separator. 

314 """ 

315 # Best keys to look for are dataid and input 0. 

316 # dataid is only used in ref provenance and always has a separator. 

317 # input 0 is always present if there is any input provenance. 

318 

319 def _update_matches(match: re.Match, prefixes: set[str], separators: set[str]) -> None: 

320 prefix, *seps = match.groups() 

321 if prefix: 

322 # Will have a separator at the end. 

323 prefix, sep = prefix[:-1], prefix[-1] 

324 separators.add(sep) 

325 prefixes.add(prefix) 

326 separators |= set(seps) 

327 

328 separators: set[str] = set() 

329 prefixes: set[str] = set() 

330 

331 # It is possible for there to be no inputs and just a reference 

332 # dataset. If that reference dataset has no DATAID then the simple 

333 # logic will not work. In that scenario look for the presence 

334 # of RUN, DATASETTYPE and ID to spot that provenance exists. 

335 # In this case it may not be possible to determine a sep value. 

336 backup = {} 

337 

338 for k in prov_dict: 

339 if match := re.match("(.*)input(.)0(.)(?:id|datasettype|run)$", k, flags=re.IGNORECASE): 

340 _update_matches(match, prefixes, separators) 

341 elif match := re.match( 

342 # Data coordinates are a-z or underscore. 

343 "(.*)dataid(.)[a-z_]+$", 

344 k, 

345 flags=re.IGNORECASE, 

346 ): 

347 _update_matches(match, prefixes, separators) 

348 elif match := re.match(r"(.*)\b(id|datasettype|run)$", k, flags=re.IGNORECASE): 

349 prefix, key = match.groups() 

350 backup[key.lower()] = prefix 

351 

352 if not prefixes: 

353 if "run" in backup and "datasettype" in backup and "id" in backup: 

354 # Looks like there is a provenance after all. All 3 must be 

355 # present. 

356 for prefix in backup.values(): 

357 if prefix: 

358 prefix, sep = prefix[:-1], prefix[-1] 

359 else: 

360 sep = " " # Will not be used. 

361 prefixes.add(prefix) 

362 separators.add(sep) 

363 

364 if not prefixes: 

365 return None, None 

366 

367 if len(separators) > 1: 

368 raise ValueError( 

369 f"Inconsistent values found for separators in provenance header. Got {separators}." 

370 ) 

371 if len(prefixes) > 1: 

372 raise ValueError(f"Inconsistent values for prefix found in provenance headers. Got {prefixes}.") 

373 return prefixes.pop(), separators.pop() 

374 

375 @classmethod 

376 def _find_provenance_keys_in_flat_dict(cls, prov_dict: Mapping[str, Any]) -> dict[str, str]: 

377 """Find the provenance keys in a dictionary. 

378 

379 Parameters 

380 ---------- 

381 prov_dict : `collections.abc.Mapping` 

382 Dictionary to be analyzed. Assumed to have been populated by 

383 `to_flat_dict`. 

384 

385 Returns 

386 ------- 

387 prov_keys : `dict` [ `str`, `str` ] 

388 Provenance key as found in the given header mapping to the 

389 standardized provenance key (with prefix removed and "." 

390 separator). 

391 """ 

392 prefix, sep = cls._find_prefix_and_sep(prov_dict) 

393 

394 if prefix is None: 

395 return {} 

396 # for mypy which can not work out that the above method returns 

397 # str, str or None, None. 

398 if sep is None: 

399 return {} 

400 if prefix: 

401 # Prefix will always include the separator if it is defined. 

402 prefix += sep 

403 

404 core_provenance = tuple( 

405 f"{prefix}{k}".lower() for k in ("run", "id", "datasettype", "quantum", "n_inputs") 

406 ) 

407 

408 # Need to escape the prefix and separator for regex usage. 

409 esc_sep = re.escape(sep) 

410 esc_prefix = re.escape(prefix) 

411 prov_keys: dict[str, str] = {} 

412 for k in list(prov_dict): 

413 # the input provenance can include extra keys that we cannot 

414 # know so have to match solely on INPUT N. 

415 found_key = False 

416 if re.match(rf"{esc_prefix}input{esc_sep}(\d+){esc_sep}(.*)$", k, flags=re.IGNORECASE): 

417 found_key = True 

418 elif k.lower() in core_provenance: 

419 found_key = True 

420 elif re.match(f"{esc_prefix}dataid{esc_sep}[a-z_]+$", k, flags=re.IGNORECASE): 

421 found_key = True 

422 

423 if found_key: 

424 standard = k.removeprefix(prefix) 

425 standard = standard.replace(sep, ".") 

426 prov_keys[k] = standard.lower() 

427 

428 return prov_keys 

429 

430 @classmethod 

431 def strip_provenance_from_flat_dict(cls, prov_dict: MutableMapping[str, Any]) -> None: 

432 """Remove provenance keys from a mapping that had been populated 

433 by `to_flat_dict`. 

434 

435 Parameters 

436 ---------- 

437 prov_dict : `collections.abc.MutableMapping` 

438 Dictionary to modify. 

439 """ 

440 for prov_key in cls._find_provenance_keys_in_flat_dict(prov_dict): 

441 del prov_dict[prov_key] 

442 

443 return 

444 

445 @classmethod 

446 def from_flat_dict(cls, prov_dict: Mapping[str, Any], butler: Butler) -> tuple[Self, DatasetRef | None]: 

447 """Create a provenance object from a provenance dictionary. 

448 

449 Parameters 

450 ---------- 

451 prov_dict : `collections.abc.Mapping` 

452 Dictionary populated by `to_flat_dict`. 

453 butler : `lsst.daf.butler.Butler` 

454 Butler to query to find references datasets. 

455 

456 Returns 

457 ------- 

458 prov : `DatasetProvenance` 

459 Provenance extracted from this object. 

460 ref : `DatasetRef` or `None` 

461 Dataset associated with this provenance. Can be `None` if no 

462 provenance found. 

463 

464 Raises 

465 ------ 

466 ValueError 

467 Raised if no provenance values are found in the dictionary. 

468 RuntimeError 

469 Raised if a referenced dataset is not known to the given butler. 

470 """ 

471 prov_keys = cls._find_provenance_keys_in_flat_dict(prov_dict) 

472 if not prov_keys: 

473 raise ValueError("No provenance information found in header.") 

474 

475 def _coerce_id(id_: str | uuid.UUID) -> uuid.UUID: 

476 if isinstance(id_, uuid.UUID): 

477 return id_ 

478 return uuid.UUID(hex=id_) 

479 

480 quantum_id = None 

481 ref_id = None 

482 input_ids: dict[int, uuid.UUID] = {} 

483 extras: dict[int, dict[str, Any]] = {} 

484 

485 for k, standard in prov_keys.items(): 

486 if standard == "id": 

487 ref_id = _coerce_id(prov_dict[k]) 

488 elif standard == "quantum": 

489 quantum_id = _coerce_id(prov_dict[k]) 

490 elif match := re.match(r"input.(\d+).([a-z_]+)$", standard): 

491 input_num = int(match.group(1)) 

492 subkey = match.group(2) 

493 if subkey == "id": 

494 input_ids[input_num] = _coerce_id(prov_dict[k]) 

495 elif subkey not in ("datasettype", "run"): 

496 # Extra information. Can not know the original case 

497 # but can match to the original dictionary. 

498 if k[0].isupper(): 

499 subkey = subkey.upper() 

500 extras.setdefault(input_num, {})[subkey] = prov_dict[k] 

501 

502 ref = None 

503 if ref_id is not None: 

504 ref = butler.get_dataset(ref_id) 

505 if ref is None: 

506 raise ValueError( 

507 f"Dataset associated with this provenance ({ref_id}) is not known to this butler." 

508 ) 

509 

510 provenance = cls(quantum_id=quantum_id) 

511 

512 input_refs = {ref.id: ref for ref in butler.get_many_datasets(input_ids.values())} 

513 for i in sorted(input_ids): 

514 input_ref = input_refs.get(input_ids[i]) 

515 if input_ref is None: 

516 raise ValueError(f"Input dataset ({input_ids[i]}) is not known to this butler.") 

517 provenance.add_input(input_ref) 

518 if i in extras: 

519 provenance.add_extra_provenance(input_ref.id, extras[i]) 

520 

521 return provenance, ref