Coverage for python / lsst / images / fits / _input_archive.py: 32%

208 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 09:01 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "FitsInputArchive", 

16 "FitsOpaqueMetadata", 

17 "ReadResult", 

18 "read", 

19) 

20 

21import io 

22import os 

23from collections.abc import Callable, Iterator 

24from contextlib import contextmanager 

25from functools import cached_property 

26from types import EllipsisType 

27from typing import IO, Any, NamedTuple, Self 

28 

29import astropy.io.fits 

30import astropy.table 

31import fsspec 

32import numpy as np 

33 

34from lsst.resources import ResourcePath, ResourcePathExpression 

35 

36from .._transforms import FrameSet 

37from ..serialization import ( 

38 ArchiveTree, 

39 ArrayReferenceModel, 

40 ButlerInfo, 

41 InputArchive, 

42 MetadataValue, 

43 TableCellReferenceModel, 

44 TableReferenceModel, 

45 no_header_updates, 

46) 

47from ._common import ExtensionHDU, ExtensionKey, FitsOpaqueMetadata, InvalidFitsArchiveError 

48 

49 

50class ReadResult[T: Any](NamedTuple): 

51 """Struct used as the return value for `read`.""" 

52 

53 deserialized: T 

54 """The deserialized object itself.""" 

55 

56 metadata: dict[str, MetadataValue] 

57 """Additional flexible metadata stored with the object.""" 

58 

59 butler_info: ButlerInfo | None 

60 """Butler provenance information for the dataset this file backs.""" 

61 

62 

63def read[T: Any]( 

64 cls: type[T], 

65 path: ResourcePathExpression, 

66 *, 

67 page_size: int = 2880 * 50, 

68 partial: bool | None = None, 

69 **kwargs: Any, 

70) -> ReadResult[T]: 

71 """Read an object from a FITS file. 

72 

73 Parameters 

74 ---------- 

75 path 

76 File to read; convertible to `lsst.resources.ResourcePath`. 

77 page_size 

78 Minimum number of bytes to read at at once. Making this a multiple 

79 of the FITS block size (2880) is recommended. 

80 partial 

81 Whether we will be reading only some of the archive, or if memory 

82 pressure forces us to read it only a little at a time. If `False`, 

83 the entire raw file may be read into memory up front. Defaults to 

84 `True` if any extra ``**kwargs`` are passed with values other than 

85 `None`, since those usually indicate that only some of the original 

86 object will be loaded. 

87 **kwargs 

88 Extra keyword arguments passed to ``cls.deserialize``. 

89 

90 Returns 

91 ------- 

92 ReadResult 

93 A named tuple containing the deserialized object and any additional 

94 metadata or butler information saved alongside it. 

95 

96 Notes 

97 ----- 

98 Supported types must implement ``deserialize`` and 

99 ``_get_archive_tree_type`` (see `.Image` for an example). 

100 """ 

101 if partial is None: 

102 partial = any(v is not None for v in kwargs.values()) 

103 with FitsInputArchive.open(path, page_size=page_size, partial=partial) as archive: 

104 tree = archive.get_tree(cls._get_archive_tree_type(TableCellReferenceModel)) 

105 obj = cls.deserialize(tree, archive, **kwargs) 

106 obj._opaque_metadata = archive.get_opaque_metadata() 

107 return ReadResult(obj, tree.metadata, tree.butler_info) 

108 

109 

110class FitsInputArchive(InputArchive[TableCellReferenceModel]): 

111 """An implementation of the `.serialization.InputArchive` interface that 

112 reads from FITS files. 

113 

114 Instances of this class should only be constructed via the `open` 

115 context manager. 

116 """ 

117 

118 def __init__(self, stream: IO[bytes]): 

119 self._primary_hdu = astropy.io.fits.PrimaryHDU.readfrom(stream) 

120 # TODO: read and strip subformat declaration and version, once we start 

121 # writing those. 

122 # 

123 # TODO: do some basic checks that the file format conforms to our 

124 # expectations (e.g. primary HDU should have no data). 

125 # 

126 # Read and strip the addresses and sizes from the headers. We don't 

127 # actually need the index address because we always want to read the 

128 # JSON HDU, too, and the index HDU is always the next one (but this 

129 # could change in the future). 

130 json_address: int = self._primary_hdu.header.pop("JSONADDR") 

131 json_size: int = self._primary_hdu.header.pop("JSONSIZE") 

132 del self._primary_hdu.header["INDXADDR"] 

133 index_size: int = self._primary_hdu.header.pop("INDXSIZE") 

134 # Save the remaining primary header keys so we can propagate them on 

135 # rewrite. 

136 self._opaque_metadata = FitsOpaqueMetadata() 

137 self._opaque_metadata.add_header(self._primary_hdu.header.copy(strip=True), name="", ver=1) 

138 # Read the JSON and index HDUs from the end. 

139 stream.seek(json_address) 

140 tail_data = stream.read(json_size + index_size) 

141 index_hdu = astropy.io.fits.BinTableHDU.fromstring(tail_data[json_size:]) 

142 # Initialize lazy readers for all of the regular HDUs and the JSON HDU. 

143 self._readers = { 

144 ExtensionKey.from_index_row(row): _ExtensionReader.from_index_row(row, stream) 

145 for row in index_hdu.data 

146 } 

147 self._readers[ExtensionKey("JSON")] = _ExtensionReader.from_bytes( 

148 astropy.io.fits.BinTableHDU, tail_data[:json_size] 

149 ) 

150 # Make any empty dictionary to cache deserialized objects. 

151 self._deserialized_pointer_cache: dict[TableCellReferenceModel, Any] = {} 

152 

153 @classmethod 

154 @contextmanager 

155 def open( 

156 cls, 

157 path: ResourcePathExpression, 

158 *, 

159 page_size: int = 2880 * 50, 

160 partial: bool = False, 

161 ) -> Iterator[Self]: 

162 """Create an output archive that writes to the given file. 

163 

164 Parameters 

165 ---------- 

166 path 

167 File to read; convertible to `lsst.resources.ResourcePath`. 

168 page_size 

169 Minimum number of bytes to read at at once. Making this a multiple 

170 of the FITS block size (2880) is recommended. 

171 partial 

172 Whether we will be reading only some of the archive, or if memory 

173 pressure forces us to read it only a little at a time. If `False` 

174 (default), the entire raw file may be read into memory up front. 

175 

176 Returns 

177 ------- 

178 `contextlib.AbstractContextManager` [`FitsInputArchive`] 

179 A context manager that returns a `FitsInputArchive` when entered. 

180 """ 

181 path = ResourcePath(path) 

182 stream: IO[bytes] 

183 if not partial: 

184 stream = io.BytesIO(path.read()) 

185 yield cls(stream) 

186 else: 

187 fs: fsspec.AbstractFileSystem 

188 fs, fp = path.to_fsspec() 

189 with fs.open(fp, block_size=page_size) as stream: 

190 yield cls(stream) 

191 

192 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T: 

193 """Read the JSON tree from the archive. 

194 

195 Parameters 

196 ---------- 

197 model_type 

198 A Pydantic model type to use to validate the JSON. 

199 

200 Returns 

201 ------- 

202 T 

203 The validated Pydantic model. 

204 """ 

205 json_bytes = self._readers[ExtensionKey("JSON")].data[0]["JSON"].tobytes() 

206 return model_type.model_validate_json(json_bytes) 

207 

208 def deserialize_pointer[U: ArchiveTree, V]( 

209 self, 

210 pointer: TableCellReferenceModel, 

211 model_type: type[U], 

212 deserializer: Callable[[U, InputArchive[TableCellReferenceModel]], V], 

213 ) -> V: 

214 # Docstring inherited. 

215 if (cached := self._deserialized_pointer_cache.get(pointer)) is not None: 

216 return cached 

217 _, reader = self._get_source_reader(pointer) 

218 try: 

219 json_bytes = reader.data[pointer.row][pointer.column].tobytes() 

220 except Exception as err: 

221 raise InvalidFitsArchiveError( 

222 f"Failed to access the table cell referenced by {pointer.model_dump_json()}." 

223 ) from err 

224 result = deserializer(model_type.model_validate_json(json_bytes), self) 

225 self._deserialized_pointer_cache[pointer] = result 

226 return result 

227 

228 def get_frame_set(self, ref: TableCellReferenceModel) -> FrameSet: 

229 try: 

230 result = self._deserialized_pointer_cache[ref] 

231 except KeyError: 

232 raise AssertionError( 

233 f"Frame set at {ref.model_dump_json(indent=2)} must be deserialized " 

234 "before any dependent transform can be." 

235 ) from None 

236 if not isinstance(result, FrameSet): 

237 raise InvalidFitsArchiveError(f"Expected a FrameSet instance at {ref.model_dump_json(indent=2)}.") 

238 return result 

239 

240 def get_array( 

241 self, 

242 ref: ArrayReferenceModel, 

243 *, 

244 slices: tuple[slice, ...] | EllipsisType = ..., 

245 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

246 ) -> np.ndarray: 

247 key, reader = self._get_source_reader(ref) 

248 if slices is not ...: 

249 array = reader.section[slices] 

250 else: 

251 array = reader.data 

252 if key not in self._opaque_metadata.headers: 

253 opaque_header = reader.header.copy(strip=True) 

254 strip_header(opaque_header) 

255 self._opaque_metadata.add_header(opaque_header, key=key) 

256 return array 

257 

258 def get_table( 

259 self, 

260 ref: TableReferenceModel, 

261 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

262 ) -> astropy.table.Table: 

263 # Docstring inherited. 

264 array = self.get_structured_array(ref, strip_header) 

265 table = astropy.table.Table(array) 

266 for c in ref.columns: 

267 c.update_table(table) 

268 return table 

269 

270 def get_structured_array( 

271 self, 

272 ref: TableReferenceModel, 

273 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

274 ) -> np.ndarray: 

275 # Docstring inherited. 

276 key, reader = self._get_source_reader(ref) 

277 if key not in self._opaque_metadata.headers: 

278 opaque_header = reader.header.copy(strip=True) 

279 strip_header(opaque_header) 

280 self._opaque_metadata.add_header(opaque_header, key=key) 

281 return reader.hdu.data 

282 

283 def get_opaque_metadata(self) -> FitsOpaqueMetadata: 

284 # Docstring inherited. 

285 return self._opaque_metadata 

286 

287 def _get_source_reader( 

288 self, ref: ArrayReferenceModel | TableCellReferenceModel | TableReferenceModel 

289 ) -> tuple[ExtensionKey, _ExtensionReader]: 

290 """Get a reader for the extension referenced by a model. 

291 

292 Parameters 

293 ---------- 

294 ref 

295 A Pydantic model with a 'source' field that references a FITS HDU. 

296 

297 Returns 

298 ------- 

299 key 

300 Identifier pair for the HDU (EXTNAME, EXTVER). 

301 reader 

302 A reader object for the extension. 

303 """ 

304 if not isinstance(ref.source, str): 

305 raise InvalidFitsArchiveError(f"Reference with source={ref.source!r} is not a string.") 

306 if not ref.source.startswith("fits:"): 

307 raise InvalidFitsArchiveError( 

308 f"Reference with source={ref.source!r} does not start with 'fits:'." 

309 ) 

310 key = ExtensionKey.from_str(ref.source) 

311 try: 

312 reader = self._readers[key] 

313 except KeyError: 

314 raise InvalidFitsArchiveError(f"Unrecognized source value {key}.") from None 

315 if ref.source_is_table and not reader.is_table: 

316 raise InvalidFitsArchiveError( 

317 f"Extension with source={key} was expected to be be a binary table, not an image." 

318 ) 

319 elif not ref.source_is_table and reader.is_table: 

320 raise InvalidFitsArchiveError( 

321 f"Extension with source={key} was expected to be be an image, not a binary table." 

322 ) 

323 return key, reader 

324 

325 

326class _ExtensionReader: 

327 """A lazy-load reader for a single extension HDU. 

328 

329 Parameters 

330 ---------- 

331 hdu_cls 

332 The type of the astropy HDU instance to construct. 

333 stream 

334 The file-like object to read from. 

335 """ 

336 

337 def __init__(self, hdu_cls: type[ExtensionHDU], stream: IO[bytes]): 

338 self._hdu_cls = hdu_cls 

339 self._stream = stream 

340 

341 @classmethod 

342 def from_index_row(cls, index_row: np.void, stream: IO[bytes]) -> _ExtensionReader: 

343 """Construct from a row of the binary table index HDU. 

344 

345 Parameters 

346 ---------- 

347 index_row 

348 A record array row from the index HDU. 

349 stream 

350 The file-like object being used to read the full FITS file. 

351 

352 Returns 

353 ------- 

354 reader 

355 A reader object for the extension. 

356 """ 

357 match index_row["XTENSION"].strip(): 

358 case "IMAGE": 

359 hdu_cls = astropy.io.fits.ImageHDU 

360 case "BINTABLE": 

361 if index_row["ZIMAGE"]: 

362 hdu_cls = astropy.io.fits.CompImageHDU 

363 else: 

364 hdu_cls = astropy.io.fits.BinTableHDU 

365 case other: 

366 raise AssertionError(f"Unsupported HDU type {other!r}.") 

367 return _ExtensionReader( 

368 hdu_cls, 

369 _RangeStreamProxy( 

370 stream, 

371 start=int(index_row["HDRADDR"]), 

372 ), 

373 ) 

374 

375 @classmethod 

376 def from_bytes(cls, hdu_cls: type[ExtensionHDU], data: bytes) -> _ExtensionReader: 

377 """Construct from already-read `bytes` 

378 

379 Parameters 

380 ---------- 

381 hdu_cls 

382 The HDU type to instantiate. 

383 data 

384 Raw data for the HDU. 

385 

386 Returns 

387 ------- 

388 reader 

389 A reader object for extension. 

390 """ 

391 return _ExtensionReader(hdu_cls, io.BytesIO(data)) 

392 

393 @property 

394 def is_table(self) -> bool: 

395 """Whether this is logically a table HDU. 

396 

397 This is `False` for compressed image HDUs, even though they are 

398 represented in FITS as a binary table. 

399 """ 

400 return issubclass(self._hdu_cls, astropy.io.fits.BinTableHDU) and not issubclass( 

401 self._hdu_cls, astropy.io.fits.CompImageHDU 

402 ) 

403 

404 @cached_property 

405 def hdu(self) -> ExtensionHDU: 

406 """The Astropy HDU object.""" 

407 self._stream.seek(0) 

408 if self._hdu_cls is astropy.io.fits.CompImageHDU: 

409 # CompImageHDU.readfrom doesn't work; we need to make a minimal 

410 # example and report it upstream. Happily this workaround does 

411 # work. 

412 bintable_hdu = astropy.io.fits.BinTableHDU.readfrom(self._stream, memmap=False, cache=False) 

413 return self._hdu_cls(bintable=bintable_hdu) 

414 else: 

415 return self._hdu_cls.readfrom(self._stream, memmap=False, cache=False, uint=True) 

416 

417 @property 

418 def header(self) -> astropy.io.fits.Header: 

419 """The header of the HDU.""" 

420 return self.hdu.header 

421 

422 @property 

423 def data(self) -> np.ndarray: 

424 """The data for the HDU.""" 

425 return self.hdu.data 

426 

427 @property 

428 def section(self) -> astropy.io.fits.Section | astropy.io.fits.CompImageSection: 

429 """An Astropy expression object that reads a subset of the data when 

430 sliced. 

431 """ 

432 return self.hdu.section 

433 

434 

435class _RangeStreamProxy(IO[bytes]): 

436 """A readable IO proxy object that makes the beginning of the file appear 

437 at a custom position. 

438 

439 Parameters 

440 ---------- 

441 base 

442 Underlying readable, seekable buffer to proxy. 

443 start 

444 Offset into the base stream that will be considered the start of the 

445 proxy stream. 

446 

447 Notes 

448 ----- 

449 This class exists because Astropy doesn't seem to provide a way to read a 

450 single HDU that starts at the current seek position of a file-like object. 

451 It does provide ``readfrom`` methods on its HDU objects that take a 

452 file-like object, but these assume (possibly unintentionally; it only 

453 happens when Astropy is trying to see whether the file was opened for 

454 appending) that ``seek(0)`` will set the file-like object to the start of 

455 the HDU. 

456 """ 

457 

458 def __init__(self, base: IO[bytes], start: int): 

459 self._base = base 

460 self._start = start 

461 

462 @property 

463 def mode(self) -> str: 

464 return "rb" 

465 

466 def __enter__(self) -> Self: 

467 raise AssertionError("This proxy should not be used as a context manager.") 

468 

469 def __exit__(self, type: Any, value: Any, traceback: Any) -> None: 

470 raise AssertionError("This proxy should not be used as a context manager.") 

471 

472 def __iter__(self) -> Iterator[bytes]: 

473 return self._base.__iter__() 

474 

475 def __next__(self) -> bytes: 

476 return self._base.__next__() 

477 

478 def close(self) -> None: 

479 raise AssertionError("This proxy should not ever be closed.") 

480 

481 @property 

482 def closed(self) -> bool: 

483 return False 

484 

485 def fileno(self) -> int: 

486 raise OSError() 

487 

488 def flush(self) -> None: 

489 pass 

490 

491 def isatty(self) -> bool: 

492 return False 

493 

494 def read(self, n: int = -1, /) -> bytes: 

495 result = self._base.read(n) 

496 return result 

497 

498 def readable(self) -> bool: 

499 return True 

500 

501 def readline(self, limit: int = -1, /) -> bytes: 

502 return self._base.readline(limit) 

503 

504 def readlines(self, hint: int = -1, /) -> list[bytes]: 

505 return self._base.readlines(hint) 

506 

507 def seek(self, offset: int, whence: int = 0) -> int: 

508 match whence: 

509 case os.SEEK_SET: 

510 return self._base.seek(offset + self._start, os.SEEK_SET) - self._start 

511 case os.SEEK_CUR: 

512 return self._base.seek(offset, os.SEEK_CUR) - self._start 

513 case os.SEEK_END: 

514 return self._base.seek(offset, os.SEEK_END) - self._start 

515 raise TypeError(f"Invalid value for 'whence': {whence}.") 

516 

517 def seekable(self) -> bool: 

518 return True 

519 

520 def tell(self) -> int: 

521 return self._base.tell() - self._start 

522 

523 def truncate(self, size: int | None = None, /) -> int: 

524 raise OSError() 

525 

526 def writable(self) -> bool: 

527 return False 

528 

529 def write(self, arg: Any, /) -> int: 

530 raise OSError() 

531 

532 def writelines(self, arg: Any, /) -> None: 

533 raise OSError()