Coverage for python / lsst / daf / butler / _formatter.py: 23%

531 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-17 08:49 +0000

1# This file is part of daf_butler. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "FileIntegrityError", 

32 "Formatter", 

33 "FormatterFactory", 

34 "FormatterNotImplementedError", 

35 "FormatterParameter", 

36 "FormatterV1inV2", 

37 "FormatterV2", 

38) 

39 

40import contextlib 

41import copy 

42import logging 

43import os 

44import zipfile 

45from abc import ABCMeta, abstractmethod 

46from collections.abc import Callable, Iterator, Mapping, Set 

47from typing import TYPE_CHECKING, Any, BinaryIO, ClassVar, TypeAlias, final 

48 

49from lsst.resources import ResourceHandleProtocol, ResourcePath 

50from lsst.utils.introspection import get_full_type_name 

51from lsst.utils.timer import time_this 

52 

53from ._config import Config 

54from ._config_support import LookupKey, processLookupConfigs 

55from ._file_descriptor import FileDescriptor 

56from ._location import Location 

57from ._rubin.temporary_for_ingest import TemporaryForIngest 

58from .dimensions import DataCoordinate, DimensionUniverse 

59from .mapping_factory import MappingFactory 

60 

61log = logging.getLogger(__name__) 

62 

63if TYPE_CHECKING: 

64 from ._dataset_provenance import DatasetProvenance 

65 from ._dataset_ref import DatasetRef 

66 from ._dataset_type import DatasetType 

67 from ._storage_class import StorageClass 

68 from .datastore.cache_manager import AbstractDatastoreCacheManager 

69 

70 # Define a new special type for functions that take "entity" 

71 Entity: TypeAlias = DatasetType | DatasetRef | StorageClass | str 

72 

73 

74class FileIntegrityError(RuntimeError): 

75 """The file metadata is inconsistent with the metadata supplied by 

76 the datastore. 

77 """ 

78 

79 

80class FormatterNotImplementedError(NotImplementedError): 

81 """Formatter does not implement the specific read or write method 

82 that is being requested. 

83 """ 

84 

85 

86class FormatterV2: 

87 """Interface for reading and writing datasets using URIs. 

88 

89 The formatters are associated with a particular `StorageClass`. 

90 

91 Parameters 

92 ---------- 

93 file_descriptor : `FileDescriptor`, optional 

94 Identifies the file to read or write, and the associated storage 

95 classes and parameter information. 

96 ref : `DatasetRef` 

97 The dataset associated with this formatter. Should not be a component 

98 dataset ref. 

99 write_parameters : `dict`, optional 

100 Parameters to control how the dataset is serialized. 

101 write_recipes : `dict`, optional 

102 Detailed write recipes indexed by recipe name. 

103 

104 **kwargs 

105 Additional arguments that will be ignored but allow for 

106 `Formatter` V1 parameters to be given. 

107 

108 Notes 

109 ----- 

110 A `FormatterV2` author should not override the default `read` or `write` 

111 method. Instead for read the formatter author should implement one or all 

112 of `read_from_stream`, `read_from_uri`, or `read_from_local_file`. The 

113 method `read_from_uri` will always be attempted first and could be more 

114 efficient (since it allows the possibility for a subset of the data file to 

115 be accessed remotely when parameters or components are specified) but it 

116 will not update the local cache. If the entire contents of the remote file 

117 are being accessed (no component or parameters defined) and the dataset 

118 would be cached, `read_from_uri` will be called with a local file. If the 

119 file is remote and the parameters that have been included are known to be 

120 more efficiently handled with a local file, the `read_from_uri` method can 

121 return `NotImplemented` to indicate that a local file should be given 

122 instead. 

123 

124 Similarly for writes, the `write` method can not be subclassed. Instead 

125 the formatter author should implement `to_bytes` or `write_local_file`. 

126 For local URIs the system will always call `write_local_file` first (which 

127 by default will call `to_bytes`) to ensure atomic writes are implemented. 

128 For remote URIs with local caching disabled, `to_bytes` will be called 

129 first and the remote updated directly. If the dataset should be cached 

130 it will always be written locally first. 

131 """ 

132 

133 unsupported_parameters: ClassVar[Set[str] | None] = frozenset() 

134 """Set of read parameters not understood by this `Formatter`. An empty set 

135 means all parameters are supported. `None` indicates that no parameters 

136 are supported. These parameters should match those defined in the storage 

137 class definition. (`frozenset`). 

138 """ 

139 

140 supported_write_parameters: ClassVar[Set[str] | None] = None 

141 """Parameters understood by this formatter that can be used to control 

142 how a dataset is serialized. `None` indicates that no parameters are 

143 supported.""" 

144 

145 default_extension: ClassVar[str | None] = None 

146 """Default extension to use when writing a file. 

147 

148 Can be `None` if the extension is determined dynamically. Use the 

149 `get_write_extension` method to get the actual extension to use. 

150 """ 

151 

152 supported_extensions: ClassVar[Set[str]] = frozenset() 

153 """Set of all extensions supported by this formatter. 

154 

155 Any extension assigned to the ``default_extension`` property will be 

156 automatically included in the list of supported extensions. 

157 """ 

158 

159 can_read_from_uri: ClassVar[bool] = False 

160 """Declare whether `read_from_uri` is available to this formatter.""" 

161 

162 can_read_from_stream: ClassVar[bool] = False 

163 """Declare whether `read_from_stream` is available to this formatter.""" 

164 

165 can_read_from_local_file: ClassVar[bool] = False 

166 """Declare whether `read_from_local_file` is available to this 

167 formatter.""" 

168 

169 def __init__( 

170 self, 

171 file_descriptor: FileDescriptor, 

172 *, 

173 ref: DatasetRef, 

174 write_parameters: Mapping[str, Any] | None = None, 

175 write_recipes: Mapping[str, Any] | None = None, 

176 # Compatibility parameters. Unused in v2. 

177 **kwargs: Any, 

178 ): 

179 if not isinstance(file_descriptor, FileDescriptor): 

180 raise TypeError("File descriptor must be a FileDescriptor") 

181 

182 self._file_descriptor = file_descriptor 

183 

184 if ref.isComponent(): 

185 # It is a component ref for disassembled composites. 

186 ref = ref.makeCompositeRef() 

187 self._dataset_ref = ref 

188 

189 # Check that the write parameters are allowed 

190 if write_parameters: 

191 if self.supported_write_parameters is None: 

192 raise ValueError( 

193 f"This formatter does not accept any write parameters. Got: {', '.join(write_parameters)}" 

194 ) 

195 else: 

196 given = set(write_parameters) 

197 unknown = given - self.supported_write_parameters 

198 if unknown: 

199 s = "s" if len(unknown) != 1 else "" 

200 unknownStr = ", ".join(f"'{u}'" for u in unknown) 

201 raise ValueError(f"This formatter does not accept parameter{s} {unknownStr}") 

202 

203 self._write_parameters = write_parameters 

204 self._write_recipes = self.validate_write_recipes(write_recipes) 

205 

206 def __str__(self) -> str: 

207 return f"{self.name()}@{self.file_descriptor.location.uri}" 

208 

209 def __repr__(self) -> str: 

210 return f"{self.name()}({self.file_descriptor!r})" 

211 

212 @property 

213 def file_descriptor(self) -> FileDescriptor: 

214 """File descriptor associated with this formatter 

215 (`FileDescriptor`). 

216 """ 

217 return self._file_descriptor 

218 

219 @property 

220 def data_id(self) -> DataCoordinate: 

221 """Return Data ID associated with this formatter (`DataCoordinate`).""" 

222 return self._dataset_ref.dataId 

223 

224 @property 

225 def dataset_ref(self) -> DatasetRef: 

226 """Return Dataset Ref associated with this formatter (`DatasetRef`).""" 

227 return self._dataset_ref 

228 

229 @property 

230 def write_parameters(self) -> Mapping[str, Any]: 

231 """Parameters to use when writing out datasets.""" 

232 if self._write_parameters is not None: 

233 return self._write_parameters 

234 return {} 

235 

236 @property 

237 def write_recipes(self) -> Mapping[str, Any]: 

238 """Detailed write Recipes indexed by recipe name.""" 

239 if self._write_recipes is not None: 

240 return self._write_recipes 

241 return {} 

242 

243 def get_write_extension(self) -> str: 

244 """Extension to use when writing a file.""" 

245 default_extension = self.default_extension 

246 extension = default_extension if default_extension is not None else "" 

247 return extension 

248 

249 def can_accept(self, in_memory_dataset: Any) -> bool: 

250 """Indicate whether this formatter can accept the specified 

251 storage class directly. 

252 

253 Parameters 

254 ---------- 

255 in_memory_dataset : `object` 

256 The dataset that is to be written. 

257 

258 Returns 

259 ------- 

260 accepts : `bool` 

261 If `True` the formatter can write data of this type without 

262 requiring datastore to convert it. If `False` the datastore 

263 will attempt to convert before writing. 

264 

265 Notes 

266 ----- 

267 The base class always returns `False` even if the given type is an 

268 instance of the storage class type. This will result in a storage 

269 class conversion no-op but also allows mocks with mocked storage 

270 classes to work properly. 

271 """ 

272 return False 

273 

274 @classmethod 

275 def validate_write_recipes(cls, recipes: Mapping[str, Any] | None) -> Mapping[str, Any] | None: 

276 """Validate supplied recipes for this formatter. 

277 

278 The recipes are supplemented with default values where appropriate. 

279 

280 Parameters 

281 ---------- 

282 recipes : `dict` 

283 Recipes to validate. 

284 

285 Returns 

286 ------- 

287 validated : `dict` 

288 Validated recipes. 

289 

290 Raises 

291 ------ 

292 RuntimeError 

293 Raised if validation fails. The default implementation raises 

294 if any recipes are given. 

295 """ 

296 if recipes: 

297 raise RuntimeError(f"This formatter does not understand these write recipes: {recipes}") 

298 return recipes 

299 

300 @classmethod 

301 def name(cls) -> str: 

302 """Return the fully qualified name of the formatter. 

303 

304 Returns 

305 ------- 

306 name : `str` 

307 Fully-qualified name of formatter class. 

308 """ 

309 return get_full_type_name(cls) 

310 

311 def _is_disassembled(self) -> bool: 

312 """Return `True` if this formatter is looking at a disassembled 

313 component. 

314 """ 

315 return self.file_descriptor.component is not None 

316 

317 def _check_resource_size(self, uri: ResourcePath, recorded_size: int, resource_size: int) -> None: 

318 """Compare the recorded size with the resource size. 

319 

320 The given URI will not be accessed. 

321 """ 

322 if recorded_size >= 0 and resource_size != recorded_size: 

323 raise FileIntegrityError( 

324 "Integrity failure in Datastore. " 

325 f"Size of file {uri} ({resource_size}) " 

326 f"does not match size recorded in registry of {recorded_size}" 

327 ) 

328 

329 def _get_cache_ref(self) -> DatasetRef: 

330 """Get the `DatasetRef` to use for cache look ups. 

331 

332 Returns 

333 ------- 

334 ref : `lsst.daf.butler.DatasetRef` 

335 The dataset ref to use when looking in the cache. 

336 For single-file dataset this will be the dataset ref directly. 

337 If this is disassembled we need the component and the component 

338 will be in the `FileDescriptor`. 

339 """ 

340 if self.file_descriptor.component is None: 

341 cache_ref = self.dataset_ref 

342 else: 

343 cache_ref = self.dataset_ref.makeComponentRef(self.file_descriptor.component) 

344 return cache_ref 

345 

346 def _ensure_cache( 

347 self, cache_manager: AbstractDatastoreCacheManager | None = None 

348 ) -> AbstractDatastoreCacheManager: 

349 """Return the cache if given else return a null cache.""" 

350 if cache_manager is None: 

351 # Circular import avoidance. 

352 from .datastore.cache_manager import DatastoreDisabledCacheManager 

353 

354 cache_manager = DatastoreDisabledCacheManager(None, None) 

355 return cache_manager 

356 

357 def read( 

358 self, 

359 component: str | None = None, 

360 expected_size: int = -1, 

361 cache_manager: AbstractDatastoreCacheManager | None = None, 

362 ) -> Any: 

363 """Read a Dataset. 

364 

365 Parameters 

366 ---------- 

367 component : `str`, optional 

368 Component to read from the file. Only used if the `StorageClass` 

369 for reading differed from the `StorageClass` used to write the 

370 file. 

371 expected_size : `int`, optional 

372 If known, the expected size of the resource to read. This can be 

373 used for verification or to decide whether to do a direct read or a 

374 file download. ``-1`` indicates the file size is not known. 

375 cache_manager : `AbstractDatastoreCacheManager` 

376 A cache manager to use to allow a formatter to cache a remote file 

377 locally or read a cached file that is already local. 

378 

379 Returns 

380 ------- 

381 in_memory_dataset : `object` 

382 The requested Dataset. 

383 

384 Notes 

385 ----- 

386 This method should not be subclassed. Instead formatter subclasses 

387 should re-implement the specific ``read_from_*`` methods as 

388 appropriate. Each of these methods has a corresponding class property 

389 that must be `True` for the method to be called. 

390 

391 The priority for reading is: 

392 

393 * `read_from_uri` 

394 * `read_from_stream` 

395 * `read_from_local_file` 

396 * `read_from_uri` (but with a local file) 

397 

398 Any of these methods can return `NotImplemented` if there is a desire 

399 to skip to the next one in the list. If a dataset is being requested 

400 with no component, no parameters, and it should also be added to the 

401 local cache, the first two calls will be skipped (unless 

402 `read_from_stream` is the only implemented read method) such that a 

403 local file will be used. 

404 

405 A Formatter can also read a file from within a Zip file if the 

406 URI associated with the `FileDescriptor` corresponds to a file with 

407 a `.zip` extension and a URI fragment of the form 

408 ``zip-path={path_in_zip}``. When reading a file from within a Zip 

409 file the priority for reading is: 

410 

411 * `read_from_stream` 

412 * `read_from_local_file` 

413 * `read_from_uri` 

414 

415 There are multiple cases that must be handled for reading: 

416 

417 For a single file: 

418 

419 * No component requested, read the whole file. 

420 * Component requested, optionally read the component efficiently, 

421 else read the whole file and extract the component. 

422 * Derived component requested, read whole file or read relevant 

423 component and derive. 

424 

425 Disassembled Composite: 

426 

427 * The file to read here is the component itself. Formatter only knows 

428 about this one component file. Should be no component specified 

429 in the ``read`` call but the `FileDescriptor` will know which 

430 component this is. 

431 * A derived component. The file to read is a component but not the 

432 specified component. The caching needs the component from which 

433 it's derived. 

434 

435 Raises 

436 ------ 

437 lsst.daf.butler.FormatterNotImplementedError 

438 Raised if no implementations were found that could read this 

439 resource. 

440 """ 

441 # If the file to read is a ZIP file with a fragment requesting 

442 # a file within the ZIP file, it is no longer possible to use the 

443 # direct read from URI option and the contents of the Zip file must 

444 # be extracted. 

445 uri = self.file_descriptor.location.uri 

446 if uri.fragment and uri.unquoted_fragment.startswith("zip-path="): 

447 _, _, path_in_zip = uri.unquoted_fragment.partition("=") 

448 

449 # Open the Zip file using ResourcePath. 

450 with uri.open("rb") as fd: 

451 with zipfile.ZipFile(fd) as zf: # type: ignore 

452 if self.can_read_from_stream: 

453 with contextlib.closing(zf.open(path_in_zip)) as zip_fd: 

454 result = self.read_from_stream(zip_fd, component, expected_size=expected_size) 

455 

456 if result is not NotImplemented: 

457 return result 

458 

459 # For now for both URI and local file options we retrieve 

460 # the bytes to a temporary local and use that. 

461 _, suffix = os.path.splitext(path_in_zip) 

462 with ResourcePath.temporary_uri(suffix=suffix) as tmp_uri: 

463 tmp_uri.write(zf.read(path_in_zip)) 

464 

465 if self.can_read_from_local_file: 

466 result = self.read_from_local_file( 

467 tmp_uri.ospath, component, expected_size=expected_size 

468 ) 

469 if result is not NotImplemented: 

470 return result 

471 if self.can_read_from_uri: 

472 result = self.read_from_uri(tmp_uri, component, expected_size=expected_size) 

473 if result is not NotImplemented: 

474 return result 

475 

476 raise FormatterNotImplementedError( 

477 f"Formatter {self.name()} could not read the file at {uri} using any method." 

478 ) 

479 

480 # If the there are no parameters, no component request, and 

481 # the file should be cached, it is better to defer the read_from_uri 

482 # to use the local file and populate the cache. 

483 prefer_local = False 

484 if ( 

485 component is None 

486 and not self.file_descriptor.parameters 

487 and self._ensure_cache(cache_manager).should_be_cached(self._get_cache_ref()) 

488 ): 

489 prefer_local = True 

490 

491 # First see if the formatter can support direct remote read from 

492 # a URI. This can be called later from the local path. If it returns 

493 # NotImplemented the formatter decided on its own that local 

494 # reads are preferred. 

495 if not prefer_local and self.can_read_from_uri: 

496 result = self.read_directly_from_possibly_cached_uri( 

497 component, expected_size, cache_manager=cache_manager 

498 ) 

499 if result is not NotImplemented: 

500 return result 

501 

502 # Some formatters might want to be able to read directly from 

503 # an open file stream. This is preferred over forcing a download 

504 # to local file system unless a file read option is available and we 

505 # want to store it in the cache because the whole file is being read. 

506 if self.can_read_from_stream and not ( 

507 prefer_local and (self.can_read_from_uri or self.can_read_from_local_file) 

508 ): 

509 result = self.read_from_possibly_cached_stream( 

510 component, expected_size, cache_manager=cache_manager 

511 ) 

512 if result is not NotImplemented: 

513 return result 

514 

515 # Finally, try to read the local file. 

516 if self.can_read_from_local_file or self.can_read_from_uri: 

517 result = self.read_from_possibly_cached_local_file( 

518 component, expected_size, cache_manager=cache_manager 

519 ) 

520 if result is not NotImplemented: 

521 return result 

522 

523 raise FormatterNotImplementedError( 

524 f"Formatter {self.name()} could not read the file at {uri} using any method." 

525 ) 

526 

527 def _read_from_possibly_cached_location_no_cache_write( 

528 self, 

529 callback: Callable[[ResourcePath, str | None, int], Any], 

530 component: str | None = None, 

531 expected_size: int = -1, 

532 *, 

533 cache_manager: AbstractDatastoreCacheManager | None = None, 

534 ) -> Any: 

535 """Read from the cache and call payload without writing to cache.""" 

536 cache_manager = self._ensure_cache(cache_manager) 

537 

538 uri = self.file_descriptor.location.uri 

539 cache_ref = self._get_cache_ref() 

540 

541 # The component for log messages is either the component requested 

542 # explicitly or the component from the file descriptor. 

543 log_component = component if component is not None else self.file_descriptor.component 

544 

545 with cache_manager.find_in_cache(cache_ref, uri.getExtension()) as cached_file: 

546 if cached_file is not None: 

547 desired_uri = cached_file 

548 msg = f" (cached version of {uri})" 

549 else: 

550 desired_uri = uri 

551 msg = "" 

552 

553 if desired_uri.isLocal: 

554 # Do not spend the time doing a slow size() call to a remote 

555 # resource. 

556 self._check_resource_size(desired_uri, expected_size, desired_uri.size()) 

557 

558 with time_this( 

559 log, 

560 msg="Reading%s from file handle %s%s with formatter %s", 

561 args=( 

562 f" component {log_component}" if log_component else "", 

563 desired_uri, 

564 msg, 

565 self.name(), 

566 ), 

567 ): 

568 return callback(desired_uri, component, expected_size) 

569 

570 def read_from_possibly_cached_stream( 

571 self, 

572 component: str | None = None, 

573 expected_size: int = -1, 

574 *, 

575 cache_manager: AbstractDatastoreCacheManager | None = None, 

576 ) -> Any: 

577 """Read from a stream, checking for possible presence in local cache. 

578 

579 Parameters 

580 ---------- 

581 component : `str`, optional 

582 Component to read from the file. Only used if the `StorageClass` 

583 for reading differed from the `StorageClass` used to write the 

584 file. 

585 expected_size : `int`, optional 

586 If known, the expected size of the resource to read. This can be 

587 used for verification or to decide whether to do a direct read or a 

588 file download. ``-1`` indicates the file size is not known. 

589 cache_manager : `AbstractDatastoreCacheManager` 

590 A cache manager to use to allow a formatter to check if there is 

591 a copy of the file in the local cache. 

592 

593 Returns 

594 ------- 

595 in_memory_dataset : `object` or `NotImplemented` 

596 The requested Dataset or an indication that the read mode was 

597 not implemented. 

598 

599 Notes 

600 ----- 

601 Calls `read_from_stream` but will first check the datastore cache 

602 in case the file is present locally. This method will not download 

603 a file to the local cache. 

604 """ 

605 

606 def _open_stream(uri: ResourcePath, comp: str | None, size: int = -1) -> Any: 

607 with uri.open("rb") as fd: 

608 return self.read_from_stream(fd, comp, expected_size=size) 

609 

610 return self._read_from_possibly_cached_location_no_cache_write( 

611 _open_stream, component, expected_size=expected_size, cache_manager=cache_manager 

612 ) 

613 

614 def read_directly_from_possibly_cached_uri( 

615 self, 

616 component: str | None = None, 

617 expected_size: int = -1, 

618 *, 

619 cache_manager: AbstractDatastoreCacheManager | None = None, 

620 ) -> Any: 

621 """Read from arbitrary URI, checking for possible presence in local 

622 cache. 

623 

624 Parameters 

625 ---------- 

626 component : `str`, optional 

627 Component to read from the file. Only used if the `StorageClass` 

628 for reading differed from the `StorageClass` used to write the 

629 file. 

630 expected_size : `int`, optional 

631 If known, the expected size of the resource to read. This can be 

632 ``-1`` indicates the file size is not known. 

633 cache_manager : `AbstractDatastoreCacheManager` 

634 A cache manager to use to allow a formatter to check if there is 

635 a copy of the file in the local cache. 

636 

637 Returns 

638 ------- 

639 in_memory_dataset : `object` or `NotImplemented` 

640 The requested Dataset or an indication that the read mode was 

641 not implemented. 

642 

643 Notes 

644 ----- 

645 This method will first check the datastore cache 

646 in case the file is present locally. This method will not cache a 

647 remote dataset and will only do a size check for local files to avoid 

648 unnecessary round trips to a remote server. 

649 

650 The URI will be read by calling `read_from_uri`. 

651 """ 

652 

653 def _open_uri(uri: ResourcePath, comp: str | None, size: int = -1) -> Any: 

654 return self.read_from_uri(uri, comp, expected_size=size) 

655 

656 return self._read_from_possibly_cached_location_no_cache_write( 

657 _open_uri, component, expected_size=expected_size, cache_manager=cache_manager 

658 ) 

659 

660 def read_from_possibly_cached_local_file( 

661 self, 

662 component: str | None = None, 

663 expected_size: int = -1, 

664 *, 

665 cache_manager: AbstractDatastoreCacheManager | None = None, 

666 ) -> Any: 

667 """Read a dataset ensuring that a local file is used, checking the 

668 cache for it. 

669 

670 Parameters 

671 ---------- 

672 component : `str`, optional 

673 Component to read from the file. Only used if the `StorageClass` 

674 for reading differed from the `StorageClass` used to write the 

675 file. 

676 expected_size : `int`, optional 

677 If known, the expected size of the resource to read. This can be 

678 used for verification or to decide whether to do a direct read or a 

679 file download. ``-1`` indicates the file size is not known. 

680 cache_manager : `AbstractDatastoreCacheManager` 

681 A cache manager to use to allow a formatter to cache a remote file 

682 locally or read a cached file that is already local. 

683 

684 Returns 

685 ------- 

686 in_memory_dataset : `object` or `NotImplemented` 

687 The requested Dataset or an indication that the read mode was 

688 not implemented. 

689 

690 Notes 

691 ----- 

692 The file will be downloaded and cached if it is a remote resource. 

693 The file contents will be read using `read_from_local_file` or 

694 `read_from_uri`, with preference given to the former. 

695 """ 

696 cache_manager = self._ensure_cache(cache_manager) 

697 uri = self.file_descriptor.location.uri 

698 

699 # Need to have something we can look up in the cache. 

700 cache_ref = self._get_cache_ref() 

701 

702 # The component for log messages is either the component requested 

703 # explicitly or the component from the file descriptor. 

704 log_component = component if component is not None else self.file_descriptor.component 

705 

706 result = NotImplemented 

707 

708 # Ensure we have a local file. 

709 with cache_manager.find_in_cache(cache_ref, uri.getExtension()) as cached_file: 

710 if cached_file is not None: 

711 msg = f"(via cache read of remote file {uri})" 

712 uri = cached_file 

713 else: 

714 msg = "" 

715 

716 with uri.as_local() as local_uri: 

717 self._check_resource_size(self.file_descriptor.location.uri, expected_size, local_uri.size()) 

718 can_be_cached = False 

719 if uri != local_uri: 

720 # URI was remote and file was downloaded 

721 cache_msg = "" 

722 

723 if cache_manager.should_be_cached(cache_ref): 

724 # In this scenario we want to ask if the downloaded 

725 # file should be cached but we should not cache 

726 # it until after we've used it (to ensure it can't 

727 # be expired whilst we are using it). 

728 can_be_cached = True 

729 

730 # Say that it is "likely" to be cached because 

731 # if the formatter read fails we will not be 

732 # caching this file. 

733 cache_msg = " and likely cached" 

734 

735 msg = f"(via download to local file{cache_msg})" 

736 

737 with time_this( 

738 log, 

739 msg="Reading%s from location %s %s with formatter %s", 

740 args=( 

741 f" component {log_component}" if log_component else "", 

742 uri, 

743 msg, 

744 self.name(), 

745 ), 

746 ): 

747 if self.can_read_from_local_file: 

748 result = self.read_from_local_file( 

749 local_uri.ospath, component=component, expected_size=expected_size 

750 ) 

751 if result is NotImplemented and self.can_read_from_uri: 

752 # If the direct URI reader was skipped earlier and 

753 # there is no explicit local file implementation, pass 

754 # in the guaranteed local URI to the generic reader. 

755 result = self.read_from_uri( 

756 local_uri, component=component, expected_size=expected_size 

757 ) 

758 

759 # File was read successfully so can move to cache. 

760 # Also move to cache even if NotImplemented was returned. 

761 if can_be_cached: 

762 cache_manager.move_to_cache(local_uri, cache_ref) 

763 

764 return result 

765 

766 def read_from_uri(self, uri: ResourcePath, component: str | None = None, expected_size: int = -1) -> Any: 

767 """Read a dataset from a URI that can be local or remote. 

768 

769 Parameters 

770 ---------- 

771 uri : `lsst.resources.ResourcePath` 

772 URI to use to read the dataset. This URI can be local or remote 

773 and can refer to the actual resource or to a locally cached file. 

774 component : `str` or `None`, optional 

775 The component to be read from the dataset. 

776 expected_size : `int`, optional 

777 If known, the expected size of the resource to read. This can be 

778 ``-1`` indicates the file size is not known. 

779 

780 Returns 

781 ------- 

782 in_memory_dataset : `object` or `NotImplemented` 

783 The Python object read from the resource or `NotImplemented`. 

784 

785 Raises 

786 ------ 

787 lsst.daf.butler.FormatterNotImplementedError 

788 Raised if there is no support for direct reads from a, possibly, 

789 remote URI. 

790 

791 Notes 

792 ----- 

793 This method is only called if the class property 

794 ``can_read_from_uri`` is set to `True`. 

795 

796 It is possible that a cached local file will be given to this method 

797 even if it was originally a remote URI. This can happen if the original 

798 write resulted in the file being added to the local cache. 

799 

800 If the full file is being read this file will not be added to the 

801 local cache. Consider returning `NotImplemented` in 

802 this situation, for example if there are no parameters or component 

803 specified, and allowing the system to fall back to calling 

804 `read_from_local_file` (which will populate the cache if configured 

805 to do so). 

806 """ 

807 return NotImplemented 

808 

809 def read_from_stream( 

810 self, stream: BinaryIO | ResourceHandleProtocol, component: str | None = None, expected_size: int = -1 

811 ) -> Any: 

812 """Read from an open file descriptor. 

813 

814 Parameters 

815 ---------- 

816 stream : `lsst.resources.ResourceHandleProtocol` or \ 

817 `typing.BinaryIO` 

818 File stream to use to read the dataset. 

819 component : `str` or `None`, optional 

820 The component to be read from the dataset. 

821 expected_size : `int`, optional 

822 If known, the expected size of the resource to read. This can be 

823 ``-1`` indicates the file size is not known. 

824 

825 Returns 

826 ------- 

827 in_memory_dataset : `object` or `NotImplemented` 

828 The Python object read from the stream or `NotImplemented`. 

829 

830 Notes 

831 ----- 

832 Only called if the class property ``can_read_from_stream`` is `True`. 

833 """ 

834 return NotImplemented 

835 

836 def read_from_local_file(self, path: str, component: str | None = None, expected_size: int = -1) -> Any: 

837 """Read a dataset from a URI guaranteed to refer to the local file 

838 system. 

839 

840 Parameters 

841 ---------- 

842 path : `str` 

843 Path to a local file that should be read. 

844 component : `str` or `None`, optional 

845 The component to be read from the dataset. 

846 expected_size : `int`, optional 

847 If known, the expected size of the resource to read. This can be 

848 ``-1`` indicates the file size is not known. 

849 

850 Returns 

851 ------- 

852 in_memory_dataset : `object` or `NotImplemented` 

853 The Python object read from the resource or `NotImplemented`. 

854 

855 Raises 

856 ------ 

857 lsst.daf.butler.FormatterNotImplementedError 

858 Raised if there is no implementation written to read data 

859 from a local file. 

860 

861 Notes 

862 ----- 

863 This method will only be called if the class property 

864 ``can_read_from_local_file`` is `True` and other options were not 

865 used. 

866 """ 

867 return NotImplemented 

868 

869 def add_provenance( 

870 self, in_memory_dataset: Any, /, *, provenance: DatasetProvenance | None = None 

871 ) -> Any: 

872 """Add provenance to the dataset. 

873 

874 Parameters 

875 ---------- 

876 in_memory_dataset : `object` 

877 The dataset to serialize. 

878 provenance : `~lsst.daf.butler.DatasetProvenance` or `None`, optional 

879 Provenance to attach to dataset. 

880 

881 Returns 

882 ------- 

883 dataset_to_write : `object` 

884 The dataset to use for serialization. Can be the same object as 

885 given. 

886 

887 Notes 

888 ----- 

889 The base class implementation returns the given object unchanged. 

890 """ 

891 return in_memory_dataset 

892 

893 @final 

894 def write( 

895 self, 

896 in_memory_dataset: Any, 

897 /, 

898 *, 

899 cache_manager: AbstractDatastoreCacheManager | None = None, 

900 provenance: DatasetProvenance | None = None, 

901 ) -> None: 

902 """Write a Dataset. 

903 

904 Parameters 

905 ---------- 

906 in_memory_dataset : `object` 

907 The Dataset to serialize. 

908 cache_manager : `AbstractDatastoreCacheManager` 

909 A cache manager to use to allow a formatter to cache the written 

910 file. 

911 provenance : `DatasetProvenance` | `None`, optional 

912 Provenance to attach to the file being written. 

913 

914 Returns 

915 ------- 

916 None 

917 

918 Raises 

919 ------ 

920 lsst.daf.butler.FormatterNotImplementedError 

921 Raised if the formatter subclass has not implemented 

922 `write_local_file` and `to_bytes` was not called. 

923 Exception 

924 Raised if there is an error serializing the dataset to disk. 

925 

926 Notes 

927 ----- 

928 The intent is for subclasses to implement either `to_bytes` or 

929 `write_local_file` or both and not to subclass this method. 

930 """ 

931 # Ensure we are using the correct file extension. 

932 uri = self.file_descriptor.location.uri.updatedExtension(self.get_write_extension()) 

933 

934 # Attach any provenance to the dataset. This could involve returning 

935 # a different object. 

936 in_memory_dataset = self.add_provenance(in_memory_dataset, provenance=provenance) 

937 

938 written = self.write_direct(in_memory_dataset, uri, cache_manager) 

939 if not written: 

940 self.write_locally_then_move(in_memory_dataset, uri, cache_manager) 

941 

942 def write_direct( 

943 self, 

944 in_memory_dataset: Any, 

945 uri: ResourcePath, 

946 cache_manager: AbstractDatastoreCacheManager | None = None, 

947 ) -> bool: 

948 """Serialize and write directly to final location. 

949 

950 Parameters 

951 ---------- 

952 in_memory_dataset : `object` 

953 The Dataset to serialize. 

954 uri : `lsst.resources.ResourcePath` 

955 URI to use when writing the serialized dataset. 

956 cache_manager : `AbstractDatastoreCacheManager` 

957 A cache manager to use to allow a formatter to cache the written 

958 file. 

959 

960 Returns 

961 ------- 

962 written : `bool` 

963 Flag to indicate whether the direct write did happen. 

964 

965 Raises 

966 ------ 

967 Exception 

968 Raised if there was a failure from serializing to bytes that 

969 was not `~lsst.daf.butler.FormatterNotImplementedError`. 

970 

971 Notes 

972 ----- 

973 This method will call `to_bytes` to serialize the in-memory dataset 

974 and then will call the `~lsst.resources.ResourcePath.write` method 

975 directly. 

976 

977 If the dataset should be cached or is local the file will not be 

978 written and the method will return `False`. This is because local URIs 

979 should be written to a temporary file name and then renamed to allow 

980 atomic writes. That path is handled by `write_locally_then_move` 

981 through `write_local_file`) and is preferred over this method being 

982 subclassed and the atomic write re-implemented. 

983 """ 

984 cache_manager = self._ensure_cache(cache_manager) 

985 

986 # For remote URIs some datasets can be serialized directly 

987 # to bytes and sent to the remote datastore without writing a 

988 # file. If the dataset is intended to be saved to the cache 

989 # a file is always written and direct write to the remote 

990 # datastore is bypassed. 

991 data_written = False 

992 if not uri.isLocal and not cache_manager.should_be_cached(self._get_cache_ref()): 

993 # Remote URI that is not cached so can write directly. 

994 try: 

995 serialized_dataset = self.to_bytes(in_memory_dataset) 

996 except FormatterNotImplementedError: 

997 # Fallback to the file writing option. 

998 pass 

999 except Exception as e: 

1000 e.add_note( 

1001 f"Failed to serialize dataset {self.dataset_ref} of " 

1002 f"type {get_full_type_name(in_memory_dataset)} to bytes." 

1003 ) 

1004 raise 

1005 else: 

1006 log.debug("Writing bytes directly to %s", uri) 

1007 uri.write(serialized_dataset, overwrite=True) 

1008 log.debug("Successfully wrote bytes directly to %s", uri) 

1009 data_written = True 

1010 return data_written 

1011 

1012 def write_locally_then_move( 

1013 self, 

1014 in_memory_dataset: Any, 

1015 uri: ResourcePath, 

1016 cache_manager: AbstractDatastoreCacheManager | None = None, 

1017 ) -> None: 

1018 """Write file to file system and then move to final location. 

1019 

1020 Parameters 

1021 ---------- 

1022 in_memory_dataset : `object` 

1023 The Dataset to serialize. 

1024 uri : `lsst.resources.ResourcePath` 

1025 URI to use when writing the serialized dataset. 

1026 cache_manager : `AbstractDatastoreCacheManager` 

1027 A cache manager to use to allow a formatter to cache the written 

1028 file. 

1029 

1030 Raises 

1031 ------ 

1032 lsst.daf.butler.FormatterNotImplementedError 

1033 Raised if the formatter subclass has not implemented 

1034 `write_local_file`. 

1035 Exception 

1036 Raised if there is an error serializing the dataset to disk. 

1037 """ 

1038 cache_manager = self._ensure_cache(cache_manager) 

1039 

1040 with TemporaryForIngest.make_path(uri) as temporary_uri: 

1041 # Need to configure the formatter to write to a different 

1042 # location and that needs us to overwrite internals 

1043 log.debug("Writing dataset to temporary location at %s", temporary_uri) 

1044 

1045 # Assumes that if write_local_file is not subclassed that 

1046 # to_bytes will be called using the base class definition. 

1047 try: 

1048 self.write_local_file(in_memory_dataset, temporary_uri) 

1049 except Exception as e: 

1050 e.add_note( 

1051 f"Failed to serialize dataset {self.dataset_ref} of type" 

1052 f" {get_full_type_name(in_memory_dataset)} to " 

1053 f"temporary location {temporary_uri}." 

1054 ) 

1055 raise 

1056 

1057 # Use move for a local file since that becomes an efficient 

1058 # os.rename. For remote resources we use copy to allow the 

1059 # file to be cached afterwards. 

1060 transfer = "move" if uri.isLocal else "copy" 

1061 

1062 uri.transfer_from(temporary_uri, transfer=transfer, overwrite=True) 

1063 

1064 if transfer == "copy": 

1065 # Cache if required 

1066 cache_manager.move_to_cache(temporary_uri, self._get_cache_ref()) 

1067 

1068 log.debug("Successfully wrote dataset to %s via a temporary file.", uri) 

1069 

1070 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None: 

1071 """Serialize the in-memory dataset to a local file. 

1072 

1073 Parameters 

1074 ---------- 

1075 in_memory_dataset : `object` 

1076 The Python object to serialize. 

1077 uri : `~lsst.resources.ResourcePath` 

1078 The URI to use when writing the file. 

1079 

1080 Notes 

1081 ----- 

1082 By default this method will attempt to call `to_bytes` and then 

1083 write these bytes to the file. 

1084 

1085 Raises 

1086 ------ 

1087 lsst.daf.butler.FormatterNotImplementedError 

1088 Raised if the formatter subclass has not implemented this method 

1089 or has failed to implement the `to_bytes` method. 

1090 """ 

1091 log.debug("Writing bytes directly to %s", uri) 

1092 uri.write(self.to_bytes(in_memory_dataset)) 

1093 log.debug("Successfully wrote bytes directly to %s", uri) 

1094 

1095 def to_bytes(self, in_memory_dataset: Any) -> bytes: 

1096 """Serialize the in-memory dataset to bytes. 

1097 

1098 Parameters 

1099 ---------- 

1100 in_memory_dataset : `object` 

1101 The Python object to serialize. 

1102 

1103 Returns 

1104 ------- 

1105 serialized_dataset : `bytes` 

1106 Bytes representing the serialized dataset. 

1107 

1108 Raises 

1109 ------ 

1110 lsst.daf.butler.FormatterNotImplementedError 

1111 Raised if the formatter has not implemented the method. This will 

1112 not cause a problem if `write_local_file` has been implemented. 

1113 """ 

1114 raise FormatterNotImplementedError( 

1115 f"This formatter can not convert {get_full_type_name(in_memory_dataset)} directly to bytes." 

1116 ) 

1117 

1118 def make_updated_location(self, location: Location) -> Location: 

1119 """Return a new `Location` updated with this formatter's extension. 

1120 

1121 Parameters 

1122 ---------- 

1123 location : `Location` 

1124 The location to update. 

1125 

1126 Returns 

1127 ------- 

1128 updated : `Location` 

1129 A new `Location` with a new file extension applied. 

1130 """ 

1131 location = location.clone() 

1132 # If the extension is "" the extension will be removed. 

1133 location.updateExtension(self.get_write_extension()) 

1134 return location 

1135 

1136 @classmethod 

1137 def validate_extension(cls, location: Location) -> None: 

1138 """Check the extension of the provided location for compatibility. 

1139 

1140 Parameters 

1141 ---------- 

1142 location : `Location` 

1143 Location from which to extract a file extension. 

1144 

1145 Returns 

1146 ------- 

1147 None 

1148 

1149 Raises 

1150 ------ 

1151 ValueError 

1152 Raised if the formatter does not understand this extension. 

1153 """ 

1154 supported = set(cls.supported_extensions) 

1155 default = cls.default_extension # type: ignore 

1156 

1157 # If extension is implemented as an instance property it won't return 

1158 # a string when called as a class property. Assume that 

1159 # the supported extensions class property is complete. 

1160 if default is not None and isinstance(default, str): 

1161 supported.add(default) 

1162 

1163 # Get the file name from the uri 

1164 file = location.uri.basename() 

1165 

1166 # Check that this file name ends with one of the supported extensions. 

1167 # This is less prone to confusion than asking the location for 

1168 # its extension and then doing a set comparison 

1169 for ext in supported: 

1170 if file.endswith(ext): 

1171 return 

1172 

1173 raise ValueError( 

1174 f"Extension '{location.getExtension()}' on '{location}' " 

1175 f"is not supported by Formatter '{cls.__name__}' (supports: {supported})" 

1176 ) 

1177 

1178 def predict_path(self) -> str: 

1179 """Return the path that would be returned by write. 

1180 

1181 Does not write any data file. 

1182 

1183 Uses the `FileDescriptor` associated with the instance. 

1184 

1185 Returns 

1186 ------- 

1187 path : `str` 

1188 Path within datastore that would be associated with the location 

1189 stored in this `Formatter`. 

1190 """ 

1191 updated = self.make_updated_location(self.file_descriptor.location) 

1192 return updated.pathInStore.path 

1193 

1194 def segregate_parameters(self, parameters: dict[str, Any] | None = None) -> tuple[dict, dict]: 

1195 """Segregate the supplied parameters. 

1196 

1197 This splits the parameters into those understood by the 

1198 formatter and those not understood by the formatter. 

1199 

1200 Any unsupported parameters are assumed to be usable by associated 

1201 assemblers. 

1202 

1203 Parameters 

1204 ---------- 

1205 parameters : `dict`, optional 

1206 Parameters with values that have been supplied by the caller 

1207 and which might be relevant for the formatter. If `None` 

1208 parameters will be read from the registered `FileDescriptor`. 

1209 

1210 Returns 

1211 ------- 

1212 supported : `dict` 

1213 Those parameters supported by this formatter. 

1214 unsupported : `dict` 

1215 Those parameters not supported by this formatter. 

1216 """ 

1217 if parameters is None: 

1218 parameters = self.file_descriptor.parameters 

1219 

1220 if parameters is None: 

1221 return {}, {} 

1222 

1223 if self.unsupported_parameters is None: 

1224 # Support none of the parameters 

1225 return {}, parameters.copy() 

1226 

1227 # Start by assuming all are supported 

1228 supported = parameters.copy() 

1229 unsupported = {} 

1230 

1231 # And remove any we know are not supported 

1232 for p in set(supported): 

1233 if p in self.unsupported_parameters: 

1234 unsupported[p] = supported.pop(p) 

1235 

1236 return supported, unsupported 

1237 

1238 

1239class Formatter(metaclass=ABCMeta): 

1240 """Interface for reading and writing Datasets. 

1241 

1242 The formatters are associated with a particular `StorageClass`. 

1243 

1244 Parameters 

1245 ---------- 

1246 fileDescriptor : `FileDescriptor`, optional 

1247 Identifies the file to read or write, and the associated storage 

1248 classes and parameter information. 

1249 dataId : `DataCoordinate` 

1250 Data ID associated with this formatter. 

1251 writeParameters : `dict`, optional 

1252 Parameters to control how the dataset is serialized. 

1253 writeRecipes : `dict`, optional 

1254 Detailed write recipes indexed by recipe name. 

1255 **kwargs 

1256 Additional parameters that can allow parameters 

1257 from `FormatterV2` to be provided. 

1258 

1259 Notes 

1260 ----- 

1261 All Formatter subclasses should share the base class's constructor 

1262 signature. 

1263 """ 

1264 

1265 # Now assuming that Formatter v1 can only refer to files so can add 

1266 # this property for compatibility with v2. 

1267 extension: str | None = None 

1268 """Default file extension to use for writing files. None means that no 

1269 modifications will be made to the supplied file extension. (`str`)""" 

1270 

1271 unsupportedParameters: ClassVar[Set[str] | None] = frozenset() 

1272 """Set of read parameters not understood by this `Formatter`. An empty set 

1273 means all parameters are supported. `None` indicates that no parameters 

1274 are supported. These parameters should match those defined in the storage 

1275 class definition. (`frozenset`). 

1276 """ 

1277 

1278 supportedWriteParameters: ClassVar[Set[str] | None] = None 

1279 """Parameters understood by this formatter that can be used to control 

1280 how a dataset is serialized. `None` indicates that no parameters are 

1281 supported.""" 

1282 

1283 supportedExtensions: ClassVar[Set[str]] = frozenset() 

1284 """Set of all extensions supported by this formatter. 

1285 

1286 Only expected to be populated by Formatters that write files. Any extension 

1287 assigned to the ``extension`` property will be automatically included in 

1288 the list of supported extensions.""" 

1289 

1290 def __init__( 

1291 self, 

1292 fileDescriptor: FileDescriptor, 

1293 *, 

1294 dataId: DataCoordinate | None = None, 

1295 writeParameters: Mapping[str, Any] | None = None, 

1296 writeRecipes: Mapping[str, Any] | None = None, 

1297 # Allow FormatterV2 parameters to be dropped. 

1298 **kwargs: Any, 

1299 ): 

1300 if not isinstance(fileDescriptor, FileDescriptor): 

1301 raise TypeError("File descriptor must be a FileDescriptor") 

1302 self._fileDescriptor = fileDescriptor 

1303 

1304 if dataId is None: 

1305 raise RuntimeError("dataId is now required for formatter initialization") 

1306 if not isinstance(dataId, DataCoordinate): 

1307 raise TypeError(f"DataId is required to be a DataCoordinate but got {type(dataId)}.") 

1308 self._dataId = dataId 

1309 

1310 # V2 compatibility. 

1311 if writeParameters is None: 

1312 writeParameters = kwargs.get("write_parameters") 

1313 if writeRecipes is None: 

1314 writeRecipes = kwargs.get("write_recipes") 

1315 

1316 # Check that the write parameters are allowed 

1317 if writeParameters: 

1318 if self.supportedWriteParameters is None: 

1319 raise ValueError( 

1320 f"This formatter does not accept any write parameters. Got: {', '.join(writeParameters)}" 

1321 ) 

1322 else: 

1323 given = set(writeParameters) 

1324 unknown = given - self.supportedWriteParameters 

1325 if unknown: 

1326 s = "s" if len(unknown) != 1 else "" 

1327 unknownStr = ", ".join(f"'{u}'" for u in unknown) 

1328 raise ValueError(f"This formatter does not accept parameter{s} {unknownStr}") 

1329 

1330 self._writeParameters = writeParameters 

1331 self._writeRecipes = self.validate_write_recipes(writeRecipes) 

1332 

1333 def __str__(self) -> str: 

1334 return f"{self.name()}@{self.fileDescriptor.location.path}" 

1335 

1336 def __repr__(self) -> str: 

1337 return f"{self.name()}({self.fileDescriptor!r})" 

1338 

1339 @property 

1340 def fileDescriptor(self) -> FileDescriptor: 

1341 """File descriptor associated with this formatter 

1342 (`FileDescriptor`). 

1343 """ 

1344 return self._fileDescriptor 

1345 

1346 @property 

1347 def dataId(self) -> DataCoordinate: 

1348 """Return Data ID associated with this formatter (`DataCoordinate`).""" 

1349 return self._dataId 

1350 

1351 @property 

1352 def writeParameters(self) -> Mapping[str, Any]: 

1353 """Parameters to use when writing out datasets.""" 

1354 if self._writeParameters is not None: 

1355 return self._writeParameters 

1356 return {} 

1357 

1358 @property 

1359 def writeRecipes(self) -> Mapping[str, Any]: 

1360 """Detailed write Recipes indexed by recipe name.""" 

1361 if self._writeRecipes is not None: 

1362 return self._writeRecipes 

1363 return {} 

1364 

1365 def can_accept(self, in_memory_dataset: Any) -> bool: 

1366 """Indicate whether this formatter can accept the specified 

1367 storage class directly. 

1368 

1369 Parameters 

1370 ---------- 

1371 in_memory_dataset : `object` 

1372 The dataset that is to be written. 

1373 

1374 Returns 

1375 ------- 

1376 accepts : `bool` 

1377 If `True` the formatter can write data of this type without 

1378 requiring datastore to convert it. If `False` the datastore 

1379 will attempt to convert before writing. 

1380 

1381 Notes 

1382 ----- 

1383 The base class checks that the given python type matches 

1384 the python type specified for this formatter when 

1385 constructed. 

1386 """ 

1387 return isinstance(in_memory_dataset, self.file_descriptor.storageClass.pytype) 

1388 

1389 @classmethod 

1390 def validateWriteRecipes(cls, recipes: Mapping[str, Any] | None) -> Mapping[str, Any] | None: 

1391 """Validate supplied recipes for this formatter. 

1392 

1393 The recipes are supplemented with default values where appropriate. 

1394 

1395 Parameters 

1396 ---------- 

1397 recipes : `dict` 

1398 Recipes to validate. 

1399 

1400 Returns 

1401 ------- 

1402 validated : `dict` 

1403 Validated recipes. 

1404 

1405 Raises 

1406 ------ 

1407 RuntimeError 

1408 Raised if validation fails. The default implementation raises 

1409 if any recipes are given. 

1410 """ 

1411 if recipes: 

1412 raise RuntimeError(f"This formatter does not understand these writeRecipes: {recipes}") 

1413 return recipes 

1414 

1415 @classmethod 

1416 def validate_write_recipes(cls, recipes: Mapping[str, Any] | None) -> Mapping[str, Any] | None: 

1417 return cls.validateWriteRecipes(recipes) 

1418 

1419 @classmethod 

1420 def name(cls) -> str: 

1421 """Return the fully qualified name of the formatter. 

1422 

1423 Returns 

1424 ------- 

1425 name : `str` 

1426 Fully-qualified name of formatter class. 

1427 """ 

1428 return get_full_type_name(cls) 

1429 

1430 @abstractmethod 

1431 def read(self, component: str | None = None) -> Any: 

1432 """Read a Dataset. 

1433 

1434 Parameters 

1435 ---------- 

1436 component : `str`, optional 

1437 Component to read from the file. Only used if the `StorageClass` 

1438 for reading differed from the `StorageClass` used to write the 

1439 file. 

1440 

1441 Returns 

1442 ------- 

1443 inMemoryDataset : `object` 

1444 The requested Dataset. 

1445 """ 

1446 raise FormatterNotImplementedError("Type does not support reading") 

1447 

1448 @abstractmethod 

1449 def write(self, inMemoryDataset: Any) -> None: 

1450 """Write a Dataset. 

1451 

1452 Parameters 

1453 ---------- 

1454 inMemoryDataset : `object` 

1455 The Dataset to store. 

1456 """ 

1457 raise FormatterNotImplementedError("Type does not support writing") 

1458 

1459 @classmethod 

1460 def can_read_bytes(cls) -> bool: 

1461 """Indicate if this formatter can format from bytes. 

1462 

1463 Returns 

1464 ------- 

1465 can : `bool` 

1466 `True` if the `fromBytes` method is implemented. 

1467 """ 

1468 # We have no property to read so instead try to format from a byte 

1469 # and see what happens 

1470 try: 

1471 # We know the arguments are incompatible 

1472 cls.fromBytes(cls, b"") # type: ignore 

1473 except FormatterNotImplementedError: 

1474 return False 

1475 except Exception: 

1476 # There will be problems with the bytes we are supplying so ignore 

1477 pass 

1478 return True 

1479 

1480 def fromBytes(self, serializedDataset: bytes, component: str | None = None) -> object: 

1481 """Read serialized data into a Dataset or its component. 

1482 

1483 Parameters 

1484 ---------- 

1485 serializedDataset : `bytes` 

1486 Bytes object to unserialize. 

1487 component : `str`, optional 

1488 Component to read from the Dataset. Only used if the `StorageClass` 

1489 for reading differed from the `StorageClass` used to write the 

1490 file. 

1491 

1492 Returns 

1493 ------- 

1494 inMemoryDataset : `object` 

1495 The requested data as a Python object. The type of object 

1496 is controlled by the specific formatter. 

1497 """ 

1498 raise FormatterNotImplementedError("Type does not support reading from bytes.") 

1499 

1500 def toBytes(self, inMemoryDataset: Any) -> bytes: 

1501 """Serialize the Dataset to bytes based on formatter. 

1502 

1503 Parameters 

1504 ---------- 

1505 inMemoryDataset : `object` 

1506 The Python object to serialize. 

1507 

1508 Returns 

1509 ------- 

1510 serializedDataset : `bytes` 

1511 Bytes representing the serialized dataset. 

1512 """ 

1513 raise FormatterNotImplementedError("Type does not support writing to bytes.") 

1514 

1515 @contextlib.contextmanager 

1516 def _updateLocation(self, location: Location | None) -> Iterator[Location]: 

1517 """Temporarily replace the location associated with this formatter. 

1518 

1519 Parameters 

1520 ---------- 

1521 location : `Location` 

1522 New location to use for this formatter. If `None` the 

1523 formatter will not change but it will still return 

1524 the old location. This allows it to be used in a code 

1525 path where the location may not need to be updated 

1526 but the with block is still convenient. 

1527 

1528 Yields 

1529 ------ 

1530 old : `Location` 

1531 The old location that will be restored. 

1532 

1533 Notes 

1534 ----- 

1535 This is an internal method that should be used with care. 

1536 It may change in the future. Should be used as a context 

1537 manager to restore the location when the temporary is no 

1538 longer required. 

1539 """ 

1540 old = self._fileDescriptor.location 

1541 try: 

1542 if location is not None: 

1543 self._fileDescriptor.location = location 

1544 yield old 

1545 finally: 

1546 if location is not None: 

1547 self._fileDescriptor.location = old 

1548 

1549 def make_updated_location(self, location: Location) -> Location: 

1550 """Return a new `Location` updated with this formatter's extension. 

1551 

1552 Parameters 

1553 ---------- 

1554 location : `Location` 

1555 The location to update. 

1556 

1557 Returns 

1558 ------- 

1559 updated : `Location` 

1560 A new `Location` with a new file extension applied. 

1561 

1562 Raises 

1563 ------ 

1564 NotImplementedError 

1565 Raised if there is no ``extension`` attribute associated with 

1566 this formatter. 

1567 

1568 Notes 

1569 ----- 

1570 This method is available to all Formatters but might not be 

1571 implemented by all formatters. It requires that a formatter set 

1572 an ``extension`` attribute containing the file extension used when 

1573 writing files. If ``extension`` is `None` the supplied file will 

1574 not be updated. Not all formatters write files so this is not 

1575 defined in the base class. 

1576 """ 

1577 location = location.clone() 

1578 try: 

1579 # We are deliberately allowing extension to be undefined by 

1580 # default in the base class and mypy complains. 

1581 location.updateExtension(self.extension) # type:ignore 

1582 except AttributeError: 

1583 raise NotImplementedError("No file extension registered with this formatter") from None 

1584 return location 

1585 

1586 @classmethod 

1587 def validate_extension(cls, location: Location) -> None: 

1588 """Check the extension of the provided location for compatibility. 

1589 

1590 Parameters 

1591 ---------- 

1592 location : `Location` 

1593 Location from which to extract a file extension. 

1594 

1595 Returns 

1596 ------- 

1597 None 

1598 

1599 Raises 

1600 ------ 

1601 NotImplementedError 

1602 Raised if file extensions are a concept not understood by this 

1603 formatter. 

1604 ValueError 

1605 Raised if the formatter does not understand this extension. 

1606 

1607 Notes 

1608 ----- 

1609 This method is available to all Formatters but might not be 

1610 implemented by all formatters. It requires that a formatter set 

1611 an ``extension`` attribute containing the file extension used when 

1612 writing files. If ``extension`` is `None` only the set of supported 

1613 extensions will be examined. 

1614 """ 

1615 supported = set(cls.supportedExtensions) 

1616 

1617 try: 

1618 # We are deliberately allowing extension to be undefined by 

1619 # default in the base class and mypy complains. 

1620 default = cls.extension # type: ignore 

1621 except AttributeError: 

1622 raise NotImplementedError("No file extension registered with this formatter") from None 

1623 

1624 # If extension is implemented as an instance property it won't return 

1625 # a string when called as a class property. Assume that 

1626 # the supported extensions class property is complete. 

1627 if default is not None and isinstance(default, str): 

1628 supported.add(default) 

1629 

1630 # Get the file name from the uri 

1631 file = location.uri.basename() 

1632 

1633 # Check that this file name ends with one of the supported extensions. 

1634 # This is less prone to confusion than asking the location for 

1635 # its extension and then doing a set comparison 

1636 for ext in supported: 

1637 if file.endswith(ext): 

1638 return 

1639 

1640 raise ValueError( 

1641 f"Extension '{location.getExtension()}' on '{location}' " 

1642 f"is not supported by Formatter '{cls.__name__}' (supports: {supported})" 

1643 ) 

1644 

1645 def predict_path(self) -> str: 

1646 """Return the path that would be returned by write. 

1647 

1648 Does not write any data file. 

1649 

1650 Uses the `FileDescriptor` associated with the instance. 

1651 

1652 Returns 

1653 ------- 

1654 path : `str` 

1655 Path within datastore that would be associated with the location 

1656 stored in this `Formatter`. 

1657 """ 

1658 updated = self.make_updated_location(self.fileDescriptor.location) 

1659 return updated.pathInStore.path 

1660 

1661 def segregate_parameters(self, parameters: dict[str, Any] | None = None) -> tuple[dict, dict]: 

1662 """Segregate the supplied parameters. 

1663 

1664 This splits the parameters into those understood by the 

1665 formatter and those not understood by the formatter. 

1666 

1667 Any unsupported parameters are assumed to be usable by associated 

1668 assemblers. 

1669 

1670 Parameters 

1671 ---------- 

1672 parameters : `dict`, optional 

1673 Parameters with values that have been supplied by the caller 

1674 and which might be relevant for the formatter. If `None` 

1675 parameters will be read from the registered `FileDescriptor`. 

1676 

1677 Returns 

1678 ------- 

1679 supported : `dict` 

1680 Those parameters supported by this formatter. 

1681 unsupported : `dict` 

1682 Those parameters not supported by this formatter. 

1683 """ 

1684 if parameters is None: 

1685 parameters = self.fileDescriptor.parameters 

1686 

1687 if parameters is None: 

1688 return {}, {} 

1689 

1690 if self.unsupportedParameters is None: 

1691 # Support none of the parameters 

1692 return {}, parameters.copy() 

1693 

1694 # Start by assuming all are supported 

1695 supported = parameters.copy() 

1696 unsupported = {} 

1697 

1698 # And remove any we know are not supported 

1699 for p in set(supported): 

1700 if p in self.unsupportedParameters: 

1701 unsupported[p] = supported.pop(p) 

1702 

1703 return supported, unsupported 

1704 

1705 # Support classic V1 interface. 

1706 makeUpdatedLocation = make_updated_location 

1707 validateExtension = validate_extension 

1708 segregateParameters = segregate_parameters 

1709 predictPath = predict_path 

1710 

1711 # Compatibility with V2 properties. 

1712 @property 

1713 def write_parameters(self) -> Mapping[str, Any]: 

1714 return self.writeParameters 

1715 

1716 @property 

1717 def write_recipes(self) -> Mapping[str, Any]: 

1718 return self.writeRecipes 

1719 

1720 @property 

1721 def file_descriptor(self) -> FileDescriptor: 

1722 return self.fileDescriptor 

1723 

1724 @property 

1725 def data_id(self) -> DataCoordinate: 

1726 return self.dataId 

1727 

1728 

1729class FormatterFactory: 

1730 """Factory for `Formatter` instances.""" 

1731 

1732 defaultKey = LookupKey("default") 

1733 """Configuration key associated with default write parameter settings.""" 

1734 

1735 writeRecipesKey = LookupKey("write_recipes") 

1736 """Configuration key associated with write recipes.""" 

1737 

1738 def __init__(self) -> None: 

1739 self._mappingFactory = MappingFactory(Formatter) 

1740 

1741 def __contains__(self, key: LookupKey | str) -> bool: 

1742 """Indicate whether the supplied key is present in the factory. 

1743 

1744 Parameters 

1745 ---------- 

1746 key : `LookupKey`, `str` or objects with ``name`` attribute 

1747 Key to use to lookup in the factory whether a corresponding 

1748 formatter is present. 

1749 

1750 Returns 

1751 ------- 

1752 in : `bool` 

1753 `True` if the supplied key is present in the factory. 

1754 """ 

1755 return key in self._mappingFactory 

1756 

1757 def registerFormatters(self, config: Config, *, universe: DimensionUniverse) -> None: 

1758 """Bulk register formatters from a config. 

1759 

1760 Parameters 

1761 ---------- 

1762 config : `Config` 

1763 ``formatters`` section of a configuration. 

1764 universe : `DimensionUniverse`, optional 

1765 Set of all known dimensions, used to expand and validate any used 

1766 in lookup keys. 

1767 

1768 Notes 

1769 ----- 

1770 The configuration can include one level of hierarchy where an 

1771 instrument-specific section can be defined to override more general 

1772 template specifications. This is represented in YAML using a 

1773 key of form ``instrument<name>`` which can then define templates 

1774 that will be returned if a `DatasetRef` contains a matching instrument 

1775 name in the data ID. 

1776 

1777 The config is parsed using the function 

1778 `~lsst.daf.butler.configSubset.processLookupConfigs`. 

1779 

1780 The values for formatter entries can be either a simple string 

1781 referring to a python type or a dict representing the formatter and 

1782 parameters to be hard-coded into the formatter constructor. For 

1783 the dict case the following keys are supported: 

1784 

1785 - formatter: The python type to be used as the formatter class. 

1786 - parameters: A further dict to be passed directly to the 

1787 ``write_parameters`` Formatter constructor to seed it. 

1788 These parameters are validated at instance creation and not at 

1789 configuration. 

1790 

1791 Additionally, a special ``default`` section can be defined that 

1792 uses the formatter type (class) name as the keys and specifies 

1793 default write parameters that should be used whenever an instance 

1794 of that class is constructed. 

1795 

1796 .. code-block:: yaml 

1797 

1798 formatters: 

1799 default: 

1800 lsst.daf.butler.formatters.example.ExampleFormatter: 

1801 max: 10 

1802 min: 2 

1803 comment: Default comment 

1804 calexp: lsst.daf.butler.formatters.example.ExampleFormatter 

1805 coadd: 

1806 formatter: lsst.daf.butler.formatters.example.ExampleFormatter 

1807 parameters: 

1808 max: 5 

1809 

1810 Any time an ``ExampleFormatter`` is constructed it will use those 

1811 parameters. If an explicit entry later in the configuration specifies 

1812 a different set of parameters, the two will be merged with the later 

1813 entry taking priority. In the example above ``calexp`` will use 

1814 the default parameters but ``coadd`` will override the value for 

1815 ``max``. 

1816 

1817 Formatter configuration can also include a special section describing 

1818 collections of write parameters that can be accessed through a 

1819 simple label. This allows common collections of options to be 

1820 specified in one place in the configuration and reused later. 

1821 The ``write_recipes`` section is indexed by Formatter class name 

1822 and each key is the label to associate with the parameters. 

1823 

1824 .. code-block:: yaml 

1825 

1826 formatters: 

1827 write_recipes: 

1828 lsst.obs.base.formatters.fitsExposure.FixExposureFormatter: 

1829 lossless: 

1830 ... 

1831 noCompression: 

1832 ... 

1833 

1834 By convention a formatter that uses write recipes will support a 

1835 ``recipe`` write parameter that will refer to a recipe name in 

1836 the ``write_recipes`` component. The `Formatter` will be constructed 

1837 in the `FormatterFactory` with all the relevant recipes and 

1838 will not attempt to filter by looking at ``write_parameters`` in 

1839 advance. See the specific formatter documentation for details on 

1840 acceptable recipe options. 

1841 """ 

1842 allowed_keys = {"formatter", "parameters"} 

1843 

1844 contents = processLookupConfigs(config, allow_hierarchy=True, universe=universe) 

1845 

1846 # Extract any default parameter settings 

1847 defaultParameters = contents.get(self.defaultKey, {}) 

1848 if not isinstance(defaultParameters, Mapping): 

1849 raise RuntimeError( 

1850 "Default formatter parameters in config can not be a single string" 

1851 f" (got: {type(defaultParameters)})" 

1852 ) 

1853 

1854 # Extract any global write recipes -- these are indexed by 

1855 # Formatter class name. 

1856 writeRecipes = contents.get(self.writeRecipesKey, {}) 

1857 if isinstance(writeRecipes, str): 

1858 raise RuntimeError( 

1859 f"The formatters.{self.writeRecipesKey} section must refer to a dict not '{writeRecipes}'" 

1860 ) 

1861 

1862 for key, f in contents.items(): 

1863 # default is handled in a special way 

1864 if key == self.defaultKey: 

1865 continue 

1866 if key == self.writeRecipesKey: 

1867 continue 

1868 

1869 # Can be a str or a dict. 

1870 specificWriteParameters = {} 

1871 if isinstance(f, str): 

1872 formatter = f 

1873 elif isinstance(f, Mapping): 

1874 all_keys = set(f) 

1875 unexpected_keys = all_keys - allowed_keys 

1876 if unexpected_keys: 

1877 raise ValueError(f"Formatter {key} uses unexpected keys {unexpected_keys} in config") 

1878 if "formatter" not in f: 

1879 raise ValueError(f"Mandatory 'formatter' key missing for formatter key {key}") 

1880 formatter = f["formatter"] 

1881 if "parameters" in f: 

1882 specificWriteParameters = f["parameters"] 

1883 else: 

1884 raise ValueError(f"Formatter for key {key} has unexpected value: '{f}'") 

1885 

1886 # Apply any default parameters for this formatter 

1887 writeParameters = copy.deepcopy(defaultParameters.get(formatter, {})) 

1888 writeParameters.update(specificWriteParameters) 

1889 

1890 kwargs: dict[str, Any] = {} 

1891 if writeParameters: 

1892 kwargs["write_parameters"] = writeParameters 

1893 

1894 if formatter in writeRecipes: 

1895 kwargs["write_recipes"] = writeRecipes[formatter] 

1896 

1897 self.registerFormatter(key, formatter, **kwargs) 

1898 

1899 def getLookupKeys(self) -> set[LookupKey]: 

1900 """Retrieve the look up keys for all the registry entries. 

1901 

1902 Returns 

1903 ------- 

1904 keys : `set` of `LookupKey` 

1905 The keys available for matching in the registry. 

1906 """ 

1907 return self._mappingFactory.getLookupKeys() 

1908 

1909 def getFormatterClassWithMatch( 

1910 self, entity: Entity 

1911 ) -> tuple[LookupKey, type[Formatter | FormatterV2], dict[str, Any]]: 

1912 """Get the matching formatter class along with the registry key. 

1913 

1914 Parameters 

1915 ---------- 

1916 entity : `DatasetRef`, `DatasetType`, `StorageClass`, or `str` 

1917 Entity to use to determine the formatter to return. 

1918 `StorageClass` will be used as a last resort if `DatasetRef` 

1919 or `DatasetType` instance is provided. Supports instrument 

1920 override if a `DatasetRef` is provided configured with an 

1921 ``instrument`` value for the data ID. 

1922 

1923 Returns 

1924 ------- 

1925 matchKey : `LookupKey` 

1926 The key that resulted in the successful match. 

1927 formatter : `type` 

1928 The class of the registered formatter. 

1929 formatter_kwargs : `dict` 

1930 Keyword arguments that are associated with this formatter entry. 

1931 """ 

1932 names = (LookupKey(name=entity),) if isinstance(entity, str) else entity._lookupNames() 

1933 matchKey, formatter, formatter_kwargs = self._mappingFactory.getClassFromRegistryWithMatch(names) 

1934 log.debug( 

1935 "Retrieved formatter %s from key '%s' for entity '%s'", 

1936 get_full_type_name(formatter), 

1937 matchKey, 

1938 entity, 

1939 ) 

1940 

1941 return matchKey, formatter, formatter_kwargs 

1942 

1943 def getFormatterClass(self, entity: Entity) -> type: 

1944 """Get the matching formatter class. 

1945 

1946 Parameters 

1947 ---------- 

1948 entity : `DatasetRef`, `DatasetType`, `StorageClass`, or `str` 

1949 Entity to use to determine the formatter to return. 

1950 `StorageClass` will be used as a last resort if `DatasetRef` 

1951 or `DatasetType` instance is provided. Supports instrument 

1952 override if a `DatasetRef` is provided configured with an 

1953 ``instrument`` value for the data ID. 

1954 

1955 Returns 

1956 ------- 

1957 formatter : `type` 

1958 The class of the registered formatter. 

1959 """ 

1960 _, formatter, _ = self.getFormatterClassWithMatch(entity) 

1961 return formatter 

1962 

1963 def getFormatterWithMatch( 

1964 self, entity: Entity, *args: Any, **kwargs: Any 

1965 ) -> tuple[LookupKey, Formatter | FormatterV2]: 

1966 """Get a new formatter instance along with the matching registry key. 

1967 

1968 Parameters 

1969 ---------- 

1970 entity : `DatasetRef`, `DatasetType`, `StorageClass`, or `str` 

1971 Entity to use to determine the formatter to return. 

1972 `StorageClass` will be used as a last resort if `DatasetRef` 

1973 or `DatasetType` instance is provided. Supports instrument 

1974 override if a `DatasetRef` is provided configured with an 

1975 ``instrument`` value for the data ID. 

1976 *args : `tuple` 

1977 Positional arguments to use pass to the object constructor. 

1978 **kwargs 

1979 Keyword arguments to pass to object constructor. 

1980 

1981 Returns 

1982 ------- 

1983 matchKey : `LookupKey` 

1984 The key that resulted in the successful match. 

1985 formatter : `Formatter` 

1986 An instance of the registered formatter. 

1987 """ 

1988 names = (LookupKey(name=entity),) if isinstance(entity, str) else entity._lookupNames() 

1989 matchKey, formatter = self._mappingFactory.getFromRegistryWithMatch(names, *args, **kwargs) 

1990 log.debug( 

1991 "Retrieved formatter %s from key '%s' for entity '%s'", 

1992 get_full_type_name(formatter), 

1993 matchKey, 

1994 entity, 

1995 ) 

1996 

1997 return matchKey, formatter 

1998 

1999 def getFormatter(self, entity: Entity, *args: Any, **kwargs: Any) -> Formatter | FormatterV2: 

2000 """Get a new formatter instance. 

2001 

2002 Parameters 

2003 ---------- 

2004 entity : `DatasetRef`, `DatasetType`, `StorageClass`, or `str` 

2005 Entity to use to determine the formatter to return. 

2006 `StorageClass` will be used as a last resort if `DatasetRef` 

2007 or `DatasetType` instance is provided. Supports instrument 

2008 override if a `DatasetRef` is provided configured with an 

2009 ``instrument`` value for the data ID. 

2010 *args : `tuple` 

2011 Positional arguments to use pass to the object constructor. 

2012 **kwargs 

2013 Keyword arguments to pass to object constructor. 

2014 

2015 Returns 

2016 ------- 

2017 formatter : `Formatter` 

2018 An instance of the registered formatter. 

2019 """ 

2020 _, formatter = self.getFormatterWithMatch(entity, *args, **kwargs) 

2021 return formatter 

2022 

2023 def registerFormatter( 

2024 self, 

2025 type_: LookupKey | str | StorageClass | DatasetType, 

2026 formatter: str, 

2027 *, 

2028 overwrite: bool = False, 

2029 **kwargs: Any, 

2030 ) -> None: 

2031 """Register a `Formatter`. 

2032 

2033 Parameters 

2034 ---------- 

2035 type_ : `LookupKey`, `str`, `StorageClass` or `DatasetType` 

2036 Type for which this formatter is to be used. If a `LookupKey` 

2037 is not provided, one will be constructed from the supplied string 

2038 or by using the ``name`` property of the supplied entity. 

2039 formatter : `str` or class of type `Formatter` 

2040 Identifies a `Formatter` subclass to use for reading and writing 

2041 Datasets of this type. Can be a `Formatter` class. 

2042 overwrite : `bool`, optional 

2043 If `True` an existing entry will be replaced by the new value. 

2044 Default is `False`. 

2045 **kwargs 

2046 Keyword arguments to always pass to object constructor when 

2047 retrieved. 

2048 

2049 Raises 

2050 ------ 

2051 ValueError 

2052 Raised if the formatter does not name a valid formatter type and 

2053 ``overwrite`` is `False`. 

2054 """ 

2055 self._mappingFactory.placeInRegistry(type_, formatter, overwrite=overwrite, **kwargs) 

2056 

2057 

2058class FormatterV1inV2(FormatterV2): 

2059 """An implementation of a V2 formatter that provides a compatibility 

2060 interface for V1 formatters. 

2061 

2062 Parameters 

2063 ---------- 

2064 file_descriptor : `FileDescriptor`, optional 

2065 Identifies the file to read or write, and the associated storage 

2066 classes and parameter information. Its value can be `None` if the 

2067 caller will never call `Formatter.read` or `Formatter.write`. 

2068 ref : `DatasetRef` 

2069 The dataset associated with this formatter. Should not be a component 

2070 dataset ref. 

2071 formatter : `Formatter` 

2072 A version 1 `Formatter` instance. The V2 formatter layer forwards calls 

2073 to this formatter. 

2074 write_parameters : `dict`, optional 

2075 Any parameters to be hard-coded into this instance to control how 

2076 the dataset is serialized. 

2077 write_recipes : `dict`, optional 

2078 Detailed write Recipes indexed by recipe name. 

2079 **kwargs 

2080 Additional arguments that will be ignored but allow for 

2081 `Formatter` V1 parameters to be given. 

2082 """ 

2083 

2084 can_read_from_local_file = True 

2085 """This formatter can read from a local file.""" 

2086 

2087 def __init__( 

2088 self, 

2089 file_descriptor: FileDescriptor, 

2090 *, 

2091 ref: DatasetRef, 

2092 formatter: Formatter, 

2093 write_parameters: Mapping[str, Any] | None = None, 

2094 write_recipes: Mapping[str, Any] | None = None, 

2095 # Compatibility parameters. Unused in v2. 

2096 **kwargs: Any, 

2097 ): 

2098 if not isinstance(formatter, Formatter): 

2099 raise TypeError(f"Formatter parameter was not a V1 formatter (was {type(formatter)})") 

2100 

2101 # Replace the class property with instance values from this 

2102 # V1 formatter so that the V2 __init__ will be able to validate. 

2103 self.supported_write_parameters = formatter.supportedWriteParameters # type: ignore 

2104 self._formatter = formatter 

2105 

2106 super().__init__( 

2107 file_descriptor, ref=ref, write_parameters=write_parameters, write_recipes=write_recipes 

2108 ) 

2109 

2110 def get_write_extension(self) -> str: 

2111 ext = self._formatter.extension 

2112 return ext if ext is not None else "" 

2113 

2114 def segregate_parameters(self, parameters: dict[str, Any] | None = None) -> tuple[dict, dict]: 

2115 return self._formatter.segregate_parameters(parameters) 

2116 

2117 def validate_write_recipes( # type: ignore 

2118 self, 

2119 recipes: Mapping[str, Any] | None, 

2120 ) -> Mapping[str, Any] | None: 

2121 # This should be a class method but a class method can not work 

2122 # for a dynamic shim. Luckily the shim is only used as an instance. 

2123 return self._formatter.validate_write_recipes(recipes) 

2124 

2125 def read_from_local_file(self, path: str, component: str | None = None, expected_size: int = -1) -> Any: 

2126 # Need to temporarily override the location since the V1 formatter 

2127 # will not know anything about this local file. 

2128 

2129 # V2 does not have a fromBytes equivalent. 

2130 if self._formatter.can_read_bytes(): 

2131 with open(path, "rb") as fd: 

2132 serialized_dataset = fd.read() 

2133 return self._formatter.fromBytes(serialized_dataset, component=component) 

2134 

2135 location = Location(None, path) 

2136 with self._formatter._updateLocation(location): 

2137 try: 

2138 result = self._formatter.read(component=component) 

2139 except NotImplementedError: 

2140 # V1 raises NotImplementedError but V2 is expecting something 

2141 # slightly different. 

2142 return NotImplemented 

2143 return result 

2144 

2145 def to_bytes(self, in_memory_dataset: Any) -> bytes: 

2146 try: 

2147 return self._formatter.toBytes(in_memory_dataset) 

2148 except NotImplementedError as e: 

2149 # V1 raises NotImplementedError but V2 is expecting something 

2150 # slightly different. 

2151 raise FormatterNotImplementedError(str(e)) from e 

2152 

2153 def write_local_file(self, in_memory_dataset: Any, uri: ResourcePath) -> None: 

2154 with self._formatter._updateLocation(Location(None, uri)): 

2155 try: 

2156 self._formatter.write(in_memory_dataset) 

2157 except NotImplementedError as e: 

2158 # V1 raises NotImplementedError but V2 is expecting something 

2159 # slightly different. 

2160 raise FormatterNotImplementedError(str(e)) from e 

2161 

2162 

2163# Type to use when allowing a Formatter or its class name 

2164FormatterParameter: TypeAlias = str | type[Formatter] | Formatter | FormatterV2 | type[FormatterV2]