Coverage for python / lsst / daf / butler / datastore / file_templates.py: 11%

317 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-14 23:37 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28"""Support for file template string expansion.""" 

29 

30from __future__ import annotations 

31 

32__all__ = ("FileTemplate", "FileTemplateValidationError", "FileTemplates", "FileTemplatesConfig") 

33 

34import logging 

35import os.path 

36import string 

37from collections.abc import Iterable, Mapping 

38from types import MappingProxyType 

39from typing import TYPE_CHECKING, Any, TypedDict, cast 

40 

41from .._config import Config 

42from .._config_support import LookupKey, processLookupConfigs 

43from .._dataset_ref import DatasetId, DatasetRef 

44from .._exceptions import ValidationError 

45from .._storage_class import StorageClass 

46from ..dimensions import DataCoordinate, DimensionGroup 

47 

48if TYPE_CHECKING: 

49 from .._dataset_type import DatasetType 

50 from ..dimensions import DimensionRecord, DimensionUniverse 

51 

52log = logging.getLogger(__name__) 

53 

54 

55class FileTemplateValidationError(ValidationError): 

56 """Exception for file template inconsistent with associated DatasetType.""" 

57 

58 pass 

59 

60 

61class FileTemplatesConfig(Config): 

62 """Configuration information for `FileTemplates`.""" 

63 

64 pass 

65 

66 

67class FieldDict(TypedDict): 

68 """Dictionary containing the grouped fields from a template.""" 

69 

70 standard: set[str] 

71 special: set[str] 

72 subfield: set[str] 

73 parent: set[str] 

74 

75 

76class FileTemplates: 

77 """Collection of `FileTemplate` templates. 

78 

79 Parameters 

80 ---------- 

81 config : `FileTemplatesConfig` or `str` 

82 Load configuration. 

83 default : `str`, optional 

84 If not `None`, a default template to use if no template has 

85 been specified explicitly in the configuration. 

86 universe : `DimensionUniverse` 

87 The set of all known dimensions, used to normalize any lookup keys 

88 involving dimensions. 

89 

90 Notes 

91 ----- 

92 The configuration can include one level of hierarchy where an 

93 instrument-specific section can be defined to override more general 

94 template specifications. This is represented in YAML using a 

95 key of form ``instrument<name>`` which can then define templates 

96 that will be returned if a `DatasetRef` contains a matching instrument 

97 name in the data ID. 

98 

99 A default fallback template can be specified using the key ``default``. 

100 Defaulting can be disabled in a child configuration by defining the 

101 value to be an empty string or a boolean `False`. 

102 

103 The config is parsed using the function 

104 `~lsst.daf.butler.configSubset.processLookupConfigs`. 

105 """ 

106 

107 defaultKey = LookupKey("default") 

108 """Configuration key associated with the default template.""" 

109 

110 def __init__( 

111 self, 

112 config: FileTemplatesConfig | str, 

113 default: str | None = None, 

114 *, 

115 universe: DimensionUniverse, 

116 ): 

117 self.config = FileTemplatesConfig(config) 

118 self._templates = {} 

119 

120 contents = processLookupConfigs(self.config, universe=universe) 

121 

122 # Determine default to use -- defaults can be disabled if 

123 # we get a False or None 

124 defaultValue = contents.get(self.defaultKey, default) 

125 if defaultValue and not isinstance(defaultValue, str): 

126 raise RuntimeError( 

127 f"Default template value should be str or False, or None. Got '{defaultValue}'" 

128 ) 

129 self.default = FileTemplate(defaultValue) if isinstance(defaultValue, str) and defaultValue else None 

130 

131 # Convert all the values to FileTemplate, handling defaults 

132 for key, templateStr in contents.items(): 

133 if key == self.defaultKey: 

134 continue 

135 if not isinstance(templateStr, str): 

136 raise RuntimeError(f"Unexpected value in file template key {key}: {templateStr}") 

137 self._templates[key] = FileTemplate(templateStr) 

138 

139 @property 

140 def templates(self) -> Mapping[LookupKey, FileTemplate]: 

141 """Return collection of templates indexed by lookup key (`dict`).""" 

142 return MappingProxyType(self._templates) 

143 

144 def __contains__(self, key: LookupKey) -> bool: 

145 """Indicate whether the supplied key is present in the templates. 

146 

147 Parameters 

148 ---------- 

149 key : `LookupKey` 

150 Key to use to determine if a corresponding value is present 

151 in the templates. 

152 

153 Returns 

154 ------- 

155 in : `bool` 

156 `True` if the supplied key is present in the templates. 

157 """ 

158 return key in self.templates 

159 

160 def __getitem__(self, key: LookupKey) -> FileTemplate: 

161 return self.templates[key] 

162 

163 def validateTemplates( 

164 self, entities: Iterable[DatasetType | DatasetRef | StorageClass], logFailures: bool = False 

165 ) -> None: 

166 """Validate the templates. 

167 

168 Retrieves the template associated with each dataset type and 

169 validates the dimensions against the template. 

170 

171 Parameters 

172 ---------- 

173 entities : `DatasetType`, `DatasetRef`, or `StorageClass` 

174 Entities to validate against the matching templates. Can be 

175 differing types. 

176 logFailures : `bool`, optional 

177 If `True`, output a log message for every validation error 

178 detected. 

179 

180 Raises 

181 ------ 

182 FileTemplateValidationError 

183 Raised if an entity failed validation. 

184 

185 Notes 

186 ----- 

187 See `FileTemplate.validateTemplate()` for details on the validation. 

188 """ 

189 unmatchedKeys = set(self.templates) 

190 failed = [] 

191 for entity in entities: 

192 try: 

193 matchKey, template = self.getTemplateWithMatch(entity) 

194 except KeyError as e: 

195 # KeyError always quotes on stringification so strip here 

196 errMsg = str(e).strip("\"'") 

197 failed.append(errMsg) 

198 if logFailures: 

199 log.critical("%s", errMsg) 

200 continue 

201 

202 if matchKey in unmatchedKeys: 

203 unmatchedKeys.remove(matchKey) 

204 

205 try: 

206 template.validateTemplate(entity) 

207 except FileTemplateValidationError as e: 

208 failed.append(f"{e} (via key '{matchKey}')") 

209 if logFailures: 

210 log.critical("Template failure with key '%s': %s", matchKey, e) 

211 

212 if logFailures and unmatchedKeys: 

213 log.warning("Unchecked keys: '%s'", ", ".join([str(k) for k in unmatchedKeys])) 

214 

215 if failed: 

216 if len(failed) == 1: 

217 msg = str(failed[0]) 

218 else: 

219 failMsg = ";\n".join(failed) 

220 msg = f"{len(failed)} template validation failures: {failMsg}" 

221 raise FileTemplateValidationError(msg) 

222 

223 def getLookupKeys(self) -> set[LookupKey]: 

224 """Retrieve the look up keys for all the template entries. 

225 

226 Returns 

227 ------- 

228 keys : `set` of `LookupKey` 

229 The keys available for matching a template. 

230 """ 

231 return set(self.templates) 

232 

233 def getTemplateWithMatch( 

234 self, entity: DatasetRef | DatasetType | StorageClass 

235 ) -> tuple[LookupKey, FileTemplate]: 

236 """Retrieve the `FileTemplate` associated with the dataset type. 

237 

238 Also retrieves the lookup key that was a match for this template. 

239 

240 If the lookup name corresponds to a component the base name for 

241 the component will be examined if the full component name does 

242 not match. 

243 

244 Parameters 

245 ---------- 

246 entity : `DatasetType`, `DatasetRef`, or `StorageClass` 

247 Instance to use to look for a corresponding template. 

248 A `DatasetType` name or a `StorageClass` name will be used 

249 depending on the supplied entity. Priority is given to a 

250 `DatasetType` name. Supports instrument override if a 

251 `DatasetRef` is provided configured with an ``instrument`` 

252 value for the data ID. 

253 

254 Returns 

255 ------- 

256 matchKey : `LookupKey` 

257 The key that resulted in the successful match. 

258 template : `FileTemplate` 

259 Template instance to use with that dataset type. 

260 

261 Raises 

262 ------ 

263 KeyError 

264 Raised if no template could be located for this Dataset type. 

265 """ 

266 # Get the names to use for lookup 

267 names = entity._lookupNames() 

268 

269 # Get a location from the templates 

270 template = self.default 

271 source = self.defaultKey 

272 for name in names: 

273 if name in self.templates: 

274 template = self.templates[name] 

275 source = name 

276 break 

277 

278 if template is None: 

279 raise KeyError(f"Unable to determine file template from supplied argument [{entity}]") 

280 

281 log.debug("Got file %s from %s via %s", template, entity, source) 

282 

283 return source, template 

284 

285 def getTemplate(self, entity: DatasetType | DatasetRef | StorageClass) -> FileTemplate: 

286 """Retrieve the `FileTemplate` associated with the dataset type. 

287 

288 If the lookup name corresponds to a component the base name for 

289 the component will be examined if the full component name does 

290 not match. 

291 

292 Parameters 

293 ---------- 

294 entity : `DatasetType`, `DatasetRef`, or `StorageClass` 

295 Instance to use to look for a corresponding template. 

296 A `DatasetType` name or a `StorageClass` name will be used 

297 depending on the supplied entity. Priority is given to a 

298 `DatasetType` name. Supports instrument override if a 

299 `DatasetRef` is provided configured with an ``instrument`` 

300 value for the data ID. 

301 

302 Returns 

303 ------- 

304 template : `FileTemplate` 

305 Template instance to use with that dataset type. 

306 

307 Raises 

308 ------ 

309 KeyError 

310 Raised if no template could be located for this Dataset type. 

311 """ 

312 _, template = self.getTemplateWithMatch(entity) 

313 return template 

314 

315 

316class FileTemplate: 

317 """Format a path template into a fully expanded path. 

318 

319 Parameters 

320 ---------- 

321 template : `str` 

322 Template string. 

323 

324 Raises 

325 ------ 

326 FileTemplateValidationError 

327 Raised if the template fails basic validation. 

328 

329 Notes 

330 ----- 

331 The templates use the standard Format Specification Mini-Language 

332 with the caveat that only named fields can be used. The field names 

333 are taken from the Dimensions along with several additional fields: 

334 

335 - datasetType: `str`, `DatasetType.name` 

336 - component: `str`, name of the StorageClass component 

337 - run: `str`, name of the run this dataset was added with 

338 

339 `run` must always be provided to ensure unique paths. 

340 

341 More detailed information can be requested from dimensions by using a dot 

342 notation, so ``visit.name`` would use the name of the visit and 

343 ``detector.name_in_raft`` would use the name of the detector within the 

344 raft. 

345 

346 In some cases the template may want to support multiple options for a 

347 single part of the template. For example, you may not want to include 

348 ``group`` if ``exposure`` is in the data ID. To handle this situation a 

349 ``|`` character can be used to specify multiple data Id keys in the 

350 same format specifier. For example ``{exposure.obs_id|group}`` would 

351 choose ``exposure.obs_id`` if ``exposure`` is in the data ID but otherwise 

352 would use ``group``. 

353 

354 The mini-language is extended to understand a "?" in the format 

355 specification. This indicates that a field is optional. If that 

356 Dimension is missing the field, along with the text before the field, 

357 unless it is a path separator, will be removed from the output path. 

358 

359 By default any "/" in a dataId value will be replaced by "_" to prevent 

360 unexpected directories being created in the path. If the "/" should be 

361 retained then a special "/" format specifier can be included in the 

362 template. 

363 """ 

364 

365 mandatoryFields = {"run", "id"} 

366 """A set of fields, one of which must be present in a template.""" 

367 

368 datasetFields = {"datasetType", "component"} 

369 """Fields related to the supplied dataset, not a dimension.""" 

370 

371 specialFields = mandatoryFields | datasetFields 

372 """Set of special fields that are available independently of the defined 

373 Dimensions.""" 

374 

375 def __init__(self, template: str): 

376 if not isinstance(template, str): 

377 raise FileTemplateValidationError( 

378 f"Template ('{template}') does not contain any format specifiers" 

379 ) 

380 self.template = template 

381 

382 # Do basic validation without access to dimensions 

383 self.validateTemplate(None) 

384 

385 def __eq__(self, other: Any) -> bool: 

386 if not isinstance(other, FileTemplate): 

387 return False 

388 

389 return self.template == other.template 

390 

391 def __str__(self) -> str: 

392 return self.template 

393 

394 def __repr__(self) -> str: 

395 return f'{self.__class__.__name__}("{self.template}")' 

396 

397 def grouped_fields(self, dimensions: DimensionGroup | None = None) -> tuple[FieldDict, FieldDict]: 

398 """Return all the fields, grouped by their type. 

399 

400 Parameters 

401 ---------- 

402 dimensions : `lsst.daf.butler.DimensionGroup` or `None` 

403 If present, can be used to filter unknown or unused dimensions out 

404 of the template when alternates are used. This allows a template to 

405 have newer dimensions within it that are not known to an older 

406 universe so long as an alternative is given that works with an 

407 older universe. If none of the alternates are present in the 

408 dimensions the first will be returned. The caller can determine how 

409 to handle the situation. 

410 

411 Returns 

412 ------- 

413 grouped : `FieldDict` 

414 The fields grouped by their type. The keys for this dict are 

415 ``standard``, ``special``, ``subfield``, and 

416 ``parent``. If field ``a.b`` is present, ``a`` will not be 

417 included in ``standard`` but will be included in ``parent``. 

418 grouped_optional : `FieldDict` 

419 As for ``grouped`` but the optional fields. 

420 """ 

421 fmt = string.Formatter() 

422 parts = fmt.parse(self.template) 

423 

424 grouped: FieldDict = { 

425 "standard": set(), 

426 "special": set(), 

427 "subfield": set(), 

428 "parent": set(), 

429 } 

430 grouped_optional: FieldDict = { 

431 "standard": set(), 

432 "special": set(), 

433 "subfield": set(), 

434 "parent": set(), 

435 } 

436 

437 for _, field_names, format_spec, _ in parts: 

438 if field_names is not None and format_spec is not None: 

439 # Determine which fields are in the dimension universe. 

440 given_fields = field_names.split("|") 

441 validated_fields: list[str] = [] 

442 if dimensions is not None: 

443 for field in given_fields: 

444 if "." in field: 

445 field_name, _ = field.split(".") 

446 else: 

447 field_name = field 

448 if field_name in dimensions or field_name in self.specialFields: 

449 # Found one that is in the relevant dimensions 

450 # so stop searching. 

451 validated_fields.append(field) 

452 break 

453 if not validated_fields: 

454 # None of them were in the dimensions or we had no 

455 # dimensions. Use all of them below and let the caller work 

456 # it (some of these may be skypix). 

457 validated_fields = given_fields 

458 

459 if "?" in format_spec: 

460 target = grouped_optional 

461 else: 

462 target = grouped 

463 

464 for field_name in validated_fields: # Treat alternates as equals. 

465 subfield = None 

466 if field_name in self.specialFields: 

467 field_set = target["special"] 

468 elif "." in field_name: 

469 # This needs to be added twice. 

470 subfield = field_name 

471 field_set = target["parent"] 

472 field_name, _ = field_name.split(".") 

473 target["subfield"].add(subfield) 

474 else: 

475 field_set = target["standard"] 

476 

477 field_set.add(field_name) 

478 

479 return grouped, grouped_optional 

480 

481 def fields(self, optionals: bool = False, specials: bool = False, subfields: bool = False) -> set[str]: 

482 """Return the field names used in this template. 

483 

484 Parameters 

485 ---------- 

486 optionals : `bool` 

487 If `True`, optional fields are included in the returned set. 

488 specials : `bool` 

489 If `True`, non-dimension fields are included. 

490 subfields : `bool`, optional 

491 If `True`, fields with syntax ``a.b`` are included. If `False`, 

492 the default, only ``a`` would be returned. 

493 

494 Returns 

495 ------- 

496 names : `set` 

497 Names of fields used in this template. 

498 

499 Notes 

500 ----- 

501 The returned set will include the special values such as 

502 ``datasetType`` and ``component``. 

503 """ 

504 fmt = string.Formatter() 

505 parts = fmt.parse(self.template) 

506 

507 names = set() 

508 for _, field_names, format_spec, _ in parts: 

509 if field_names is not None and format_spec is not None: 

510 if not optionals and "?" in format_spec: 

511 continue 

512 for field_name in field_names.split("|"): 

513 if not specials and field_name in self.specialFields: 

514 continue 

515 

516 if not subfields and "." in field_name: 

517 field_name, _ = field_name.split(".") 

518 

519 names.add(field_name) 

520 

521 return names 

522 

523 def format(self, ref: DatasetRef) -> str: 

524 """Format a template string into a full path. 

525 

526 Parameters 

527 ---------- 

528 ref : `DatasetRef` 

529 The dataset to be formatted. 

530 

531 Returns 

532 ------- 

533 path : `str` 

534 Expanded path. 

535 

536 Raises 

537 ------ 

538 KeyError 

539 Raised if the requested field is not defined and the field is 

540 not optional. Or, `component` is specified but "component" was 

541 not part of the template. 

542 RuntimeError 

543 Raised if a template uses dimension record metadata but no 

544 records are attached to the `DatasetRef`. 

545 """ 

546 # Get the dimension values. Should all be non None. 

547 # Will want to store a DatasetId in it later. 

548 fields = cast(dict[str, int | str | DatasetId], dict(ref.dataId.mapping)) 

549 # Extra information that can be included using . syntax 

550 extras: dict[str, DimensionRecord | None] = {} 

551 skypix_alias: str | None = None 

552 can_use_extra_records = False 

553 if isinstance(ref.dataId, DataCoordinate): 

554 if ref.dataId.hasRecords(): 

555 can_use_extra_records = True 

556 skypix_alias = self._determine_skypix_alias(ref) 

557 if skypix_alias is not None: 

558 fields["skypix"] = fields[skypix_alias] 

559 

560 datasetType = ref.datasetType 

561 fields["datasetType"], component = datasetType.nameAndComponent() 

562 

563 usedComponent = False 

564 if component is not None: 

565 fields["component"] = component 

566 

567 fields["run"] = ref.run 

568 fields["id"] = ref.id 

569 

570 fmt = string.Formatter() 

571 parts = fmt.parse(self.template) 

572 output = "" 

573 

574 for literal, field_name, format_spec, conversion in parts: 

575 if field_name and "|" in field_name: 

576 alternates = field_name.split("|") 

577 for alt in alternates: 

578 if "." in alt: 

579 primary, _ = alt.split(".") 

580 else: 

581 primary = alt 

582 # If the alternate is known to this data ID then we use 

583 # it and drop the lower priority fields. 

584 if primary in fields: 

585 field_name = alt 

586 break 

587 else: 

588 # None of these were found in the field list. Select the 

589 # first and let downstream code handle whether this 

590 # is optional or not. 

591 field_name = alternates[0] 

592 

593 if field_name == "component": 

594 usedComponent = True 

595 

596 if format_spec is None: 

597 output = output + literal 

598 continue 

599 

600 # Should only happen if format_spec is None 

601 if field_name is None: 

602 raise RuntimeError(f"Unexpected blank field_name encountered in {self.template} [{literal}]") 

603 

604 if "?" in format_spec: 

605 optional = True 

606 # Remove the non-standard character from the spec 

607 format_spec = format_spec.replace("?", "") 

608 else: 

609 optional = False 

610 

611 # Check for request for additional information from the dataId 

612 if "." in field_name: 

613 primary, secondary = field_name.split(".") 

614 if can_use_extra_records and primary not in extras and primary in fields: 

615 record_key = primary 

616 if primary == "skypix" and skypix_alias is not None: 

617 record_key = skypix_alias 

618 extras[record_key] = ref.dataId.records[record_key] 

619 if record_key != primary: 

620 # Make sure that htm7 and skypix both work. 

621 extras[primary] = extras[record_key] 

622 

623 if primary in extras: 

624 record = extras[primary] 

625 # Only fill in the fields if we have a value, the 

626 # KeyError will trigger below if the attribute is missing, 

627 # but only if it is not optional. This is most likely 

628 # a typo in the metadata field and so should be reported 

629 # even if optional. 

630 if hasattr(record, secondary): 

631 fields[field_name] = getattr(record, secondary) 

632 else: 

633 # Is a log message sufficient? 

634 log.info( 

635 "Template field %s could not be resolved because metadata field %s" 

636 " is not understood for dimension %s. Template entry will be ignored", 

637 field_name, 

638 secondary, 

639 primary, 

640 ) 

641 elif primary in fields: 

642 # We do have an entry for the primary but do not have any 

643 # secondary entries. This is likely a problem with the 

644 # code failing to attach a record to the DatasetRef. 

645 raise RuntimeError( 

646 f"No metadata records attached to dataset {ref}" 

647 f" when attempting to expand field {field_name}." 

648 " Either expand the DatasetRef or change the template." 

649 ) 

650 

651 if field_name in fields: 

652 value = fields[field_name] 

653 elif optional: 

654 # If this is optional ignore the format spec 

655 # and do not include the literal text prior to the optional 

656 # field unless it contains a "/" path separator 

657 format_spec = "" 

658 value = "" 

659 if "/" not in literal: 

660 literal = "" 

661 else: 

662 raise KeyError( 

663 f"'{field_name}' requested in template via '{self.template}' " 

664 "but not defined and not optional" 

665 ) 

666 

667 # Handle "/" in values since we do not want to be surprised by 

668 # unexpected directories turning up 

669 replace_slash = True 

670 if "/" in format_spec: 

671 # Remove the non-standard character from the spec 

672 format_spec = format_spec.replace("/", "") 

673 replace_slash = False 

674 

675 if isinstance(value, str): 

676 # Replace spaces with underscores for more friendly file paths 

677 value = value.replace(" ", "_") 

678 if replace_slash: 

679 value = value.replace("/", "_") 

680 

681 # Apply conversion (e.g., integer to string) 

682 if conversion: 

683 value = fmt.convert_field(value, conversion) 

684 

685 # Now use standard formatting 

686 output = output + literal + format(value, format_spec) 

687 

688 # Replace periods with underscores in the non-directory part to 

689 # prevent file extension confusion. Also replace # in the non-dir 

690 # part to avoid confusion with URI fragments 

691 head, tail = os.path.split(output) 

692 tail = tail.replace(".", "_") 

693 tail = tail.replace("#", "HASH") 

694 output = os.path.join(head, tail) 

695 

696 # Complain if we were meant to use a component 

697 if component is not None and not usedComponent: 

698 raise KeyError(f"Component '{component}' specified but template {self.template} did not use it") 

699 

700 # Since this is known to be a path, normalize it in case some double 

701 # slashes have crept in 

702 path = os.path.normpath(output) 

703 

704 # It should not be an absolute path (may happen with optionals) 

705 if os.path.isabs(path): 

706 path = os.path.relpath(path, start="/") 

707 

708 return path 

709 

710 def validateTemplate(self, entity: DatasetRef | DatasetType | StorageClass | None) -> None: 

711 """Compare the template against supplied entity that wants to use it. 

712 

713 Parameters 

714 ---------- 

715 entity : `DatasetType`, `DatasetRef`, or `StorageClass` 

716 Entity to compare against template. If `None` is given only 

717 very basic validation of templates will be performed. 

718 

719 Raises 

720 ------ 

721 FileTemplateValidationError 

722 Raised if the template is inconsistent with the supplied entity. 

723 

724 Notes 

725 ----- 

726 Validation will always include a check that mandatory fields 

727 are present and that at least one field refers to a dimension. 

728 If the supplied entity includes a `DimensionGroup` then it will be 

729 used to compare the available dimensions with those specified in the 

730 template. 

731 """ 

732 # A universe can be used to filter out alternates that are 

733 # not known. 

734 dimensions = getattr(entity, "dimensions", None) 

735 grouped_fields, grouped_optionals = self.grouped_fields(dimensions) 

736 

737 # Check that the template has run 

738 withSpecials = ( 

739 grouped_fields["standard"] 

740 | grouped_fields["parent"] 

741 | grouped_fields["special"] 

742 | grouped_optionals["standard"] 

743 | grouped_optionals["parent"] 

744 | grouped_optionals["special"] 

745 ) 

746 

747 if "collection" in withSpecials: 

748 raise FileTemplateValidationError( 

749 "'collection' is no longer supported as a file template placeholder; use 'run' instead." 

750 ) 

751 

752 if not withSpecials & self.mandatoryFields: 

753 raise FileTemplateValidationError( 

754 f"Template '{self}' is missing a mandatory field from {self.mandatoryFields}" 

755 ) 

756 

757 # Check that there are some dimension fields in the template 

758 # The id is allowed instead if present since that also uniquely 

759 # identifies the file in the datastore. 

760 allfields = ( 

761 grouped_fields["standard"] 

762 | grouped_fields["parent"] 

763 | grouped_optionals["standard"] 

764 | grouped_optionals["parent"] 

765 ) 

766 if not allfields and "id" not in withSpecials: 

767 raise FileTemplateValidationError( 

768 f"Template '{self}' does not seem to have any fields corresponding to dimensions." 

769 ) 

770 

771 # Do not allow ../ in the template to confuse where the file might 

772 # end up. 

773 if "../" in self.template: 

774 raise FileTemplateValidationError("A file template should not include jump to parent directory.") 

775 

776 # Require that if "id" is in the template then it must exist in the 

777 # file part -- this avoids templates like "{id}/fixed" where the file 

778 # name is fixed but the directory has the ID. 

779 if "id" in withSpecials: 

780 file_part = os.path.split(self.template)[-1] 

781 if "{id}" not in file_part: 

782 raise FileTemplateValidationError( 

783 f"Template '{self}' includes the 'id' but that ID is not part of the file name." 

784 ) 

785 

786 # If we do not have dimensions available then all we can do is shrug 

787 if not hasattr(entity, "dimensions"): 

788 return 

789 

790 # Mypy does not know about hasattr so help it out 

791 if entity is None: 

792 return 

793 

794 # if this entity represents a component then insist that component 

795 # is present in the template. If the entity is not a component 

796 # make sure that component is not mandatory. 

797 try: 

798 # mypy does not see the except block so complains about 

799 # StorageClass not supporting isComponent 

800 if entity.isComponent(): # type: ignore 

801 if "component" not in withSpecials: 

802 raise FileTemplateValidationError( 

803 f"Template '{self}' has no component but {entity} refers to a component." 

804 ) 

805 else: 

806 mandatorySpecials = ( 

807 grouped_fields["standard"] | grouped_fields["parent"] | grouped_fields["special"] 

808 ) 

809 if "component" in mandatorySpecials: 

810 raise FileTemplateValidationError( 

811 f"Template '{self}' has mandatory component but " 

812 f"{entity} does not refer to a component." 

813 ) 

814 except AttributeError: 

815 pass 

816 

817 # From here on we need at least a DatasetType 

818 # Mypy doesn't understand the AttributeError clause below 

819 if isinstance(entity, StorageClass): 

820 return 

821 

822 # Get the dimension links to get the full set of available field names 

823 # Fall back to dataId keys if we have them but no links. 

824 # dataId keys must still be present in the template 

825 try: 

826 minimal = set(entity.dimensions.required) 

827 maximal = set(entity.dimensions.names) 

828 except AttributeError: 

829 try: 

830 minimal = set(entity.dataId.keys().names) # type: ignore 

831 maximal = minimal 

832 except AttributeError: 

833 return 

834 

835 required = grouped_fields["standard"] | grouped_fields["parent"] 

836 

837 # Replace specific skypix dimensions with generic one 

838 skypix_alias = self._determine_skypix_alias(entity) 

839 if skypix_alias is not None: 

840 minimal.add("skypix") 

841 maximal.add("skypix") 

842 minimal.remove(skypix_alias) 

843 maximal.remove(skypix_alias) 

844 if skypix_alias in required: 

845 required.remove(skypix_alias) 

846 required.add("skypix") 

847 if skypix_alias in allfields: 

848 allfields.remove(skypix_alias) 

849 allfields.add("skypix") 

850 

851 # Calculate any field usage that does not match a dimension 

852 if not required.issubset(maximal): 

853 raise FileTemplateValidationError( 

854 f"Template '{self}' is inconsistent with {entity}: {required} is not a subset of {maximal}." 

855 ) 

856 

857 if not allfields.issuperset(minimal): 

858 raise FileTemplateValidationError( 

859 f"Template '{self}' is inconsistent with {entity}:" 

860 f" {allfields} is not a superset of {minimal}." 

861 ) 

862 

863 return 

864 

865 def _determine_skypix_alias(self, entity: DatasetRef | DatasetType) -> str | None: 

866 """Return the dimension name that refers to a sky pixel. 

867 

868 Parameters 

869 ---------- 

870 entity : `DatasetRef` or `DatasetType` 

871 The entity to examine. 

872 

873 Returns 

874 ------- 

875 alias : `str` 

876 If there is a sky pixelization in the supplied dataId, return 

877 its name, else returns `None`. Will return `None` also if there 

878 is more than one sky pix dimension in the data ID or if the 

879 dataID is not a `DataCoordinate` 

880 """ 

881 alias = None 

882 

883 if isinstance(entity, DatasetRef): 

884 entity = entity.datasetType 

885 

886 # If there is exactly one SkyPixDimension in the data ID, alias its 

887 # value with the key "skypix", so we can use that to match any 

888 # skypix dimension. 

889 # We restrict this behavior to the (real-world) case where the 

890 # data ID is a DataCoordinate, not just a dict. That should only 

891 # not be true in some test code, but that test code is a pain to 

892 # update to be more like the real world while still providing our 

893 # only tests of important behavior. 

894 if len(entity.dimensions.skypix) == 1: 

895 (alias,) = entity.dimensions.skypix 

896 return alias