Coverage for python / lsst / images / fits / _input_archive.py: 29%

207 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-30 09:07 +0000

1# This file is part of lsst-images. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (https://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# Use of this source code is governed by a 3-clause BSD-style 

10# license that can be found in the LICENSE file. 

11 

12from __future__ import annotations 

13 

14__all__ = ( 

15 "FitsInputArchive", 

16 "FitsOpaqueMetadata", 

17 "read", 

18) 

19 

20import io 

21import os 

22from collections.abc import Callable, Iterator 

23from contextlib import contextmanager 

24from functools import cached_property 

25from types import EllipsisType 

26from typing import IO, Any, Self 

27 

28import astropy.io.fits 

29import astropy.table 

30import fsspec 

31import numpy as np 

32 

33from lsst.resources import ResourcePath, ResourcePathExpression 

34 

35from .._transforms import FrameSet 

36from ..serialization import ( 

37 ArchiveReadError, 

38 ArchiveTree, 

39 ArrayReferenceModel, 

40 InlineArrayModel, 

41 InputArchive, 

42 ReadResult, 

43 TableModel, 

44 no_header_updates, 

45) 

46from ._common import ( 

47 JSON_COLUMN, 

48 JSON_EXTNAME, 

49 ExtensionHDU, 

50 ExtensionKey, 

51 FitsOpaqueMetadata, 

52 InvalidFitsArchiveError, 

53 PointerModel, 

54) 

55 

56 

57def read[T: Any]( 

58 cls: type[T], 

59 path: ResourcePathExpression, 

60 *, 

61 page_size: int = 2880 * 50, 

62 partial: bool | None = None, 

63 **kwargs: Any, 

64) -> ReadResult[T]: 

65 """Read an object from a FITS file. 

66 

67 Parameters 

68 ---------- 

69 path 

70 File to read; convertible to `lsst.resources.ResourcePath`. 

71 page_size 

72 Minimum number of bytes to read at at once. Making this a multiple 

73 of the FITS block size (2880) is recommended. 

74 partial 

75 Whether we will be reading only some of the archive, or if memory 

76 pressure forces us to read it only a little at a time. If `False`, 

77 the entire raw file may be read into memory up front. Defaults to 

78 `True` if any extra ``**kwargs`` are passed with values other than 

79 `None`, since those usually indicate that only some of the original 

80 object will be loaded. 

81 **kwargs 

82 Extra keyword arguments passed to ``cls.deserialize``. 

83 

84 Returns 

85 ------- 

86 ReadResult 

87 A named tuple containing the deserialized object and any additional 

88 metadata or butler information saved alongside it. 

89 

90 Notes 

91 ----- 

92 Supported types must implement ``deserialize`` and 

93 ``_get_archive_tree_type`` (see `.Image` for an example). 

94 """ 

95 if partial is None: 

96 partial = any(v is not None for v in kwargs.values()) 

97 with FitsInputArchive.open(path, page_size=page_size, partial=partial) as archive: 

98 tree = archive.get_tree(cls._get_archive_tree_type(PointerModel)) 

99 obj = cls.deserialize(tree, archive, **kwargs) 

100 obj._opaque_metadata = archive.get_opaque_metadata() 

101 return ReadResult(obj, tree.metadata, tree.butler_info) 

102 

103 

104class FitsInputArchive(InputArchive[PointerModel]): 

105 """An implementation of the `.serialization.InputArchive` interface that 

106 reads from FITS files. 

107 

108 Instances of this class should only be constructed via the `open` 

109 context manager. 

110 """ 

111 

112 def __init__(self, stream: IO[bytes]): 

113 self._primary_hdu = astropy.io.fits.PrimaryHDU.readfrom(stream) 

114 # TODO: read and strip subformat declaration and version, once we start 

115 # writing those. 

116 # 

117 # TODO: do some basic checks that the file format conforms to our 

118 # expectations (e.g. primary HDU should have no data). 

119 # 

120 # Read and strip the addresses and sizes from the headers. We don't 

121 # actually need the index address because we always want to read the 

122 # JSON HDU, too, and the index HDU is always the next one (but this 

123 # could change in the future). 

124 json_address: int = self._primary_hdu.header.pop("JSONADDR") 

125 json_size: int = self._primary_hdu.header.pop("JSONSIZE") 

126 del self._primary_hdu.header["INDXADDR"] 

127 index_size: int = self._primary_hdu.header.pop("INDXSIZE") 

128 # Save the remaining primary header keys so we can propagate them on 

129 # rewrite. 

130 self._opaque_metadata = FitsOpaqueMetadata() 

131 self._opaque_metadata.add_header(self._primary_hdu.header.copy(strip=True), name="", ver=1) 

132 # Read the JSON and index HDUs from the end. 

133 stream.seek(json_address) 

134 tail_data = stream.read(json_size + index_size) 

135 index_hdu = astropy.io.fits.BinTableHDU.fromstring(tail_data[json_size:]) 

136 # Initialize lazy readers for all of the regular HDUs and the JSON HDU. 

137 self._readers = { 

138 ExtensionKey.from_index_row(row): _ExtensionReader.from_index_row(row, stream) 

139 for row in index_hdu.data 

140 } 

141 self._readers[ExtensionKey(JSON_COLUMN)] = _ExtensionReader.from_bytes( 

142 astropy.io.fits.BinTableHDU, tail_data[:json_size] 

143 ) 

144 # Make any empty dictionary to cache deserialized objects. Keys are 

145 # the zero-indexed row in the JSON table. 

146 self._deserialized_pointer_cache: dict[int, Any] = {} 

147 

148 @classmethod 

149 @contextmanager 

150 def open( 

151 cls, 

152 path: ResourcePathExpression, 

153 *, 

154 page_size: int = 2880 * 50, 

155 partial: bool = False, 

156 ) -> Iterator[Self]: 

157 """Create an output archive that writes to the given file. 

158 

159 Parameters 

160 ---------- 

161 path 

162 File to read; convertible to `lsst.resources.ResourcePath`. 

163 page_size 

164 Minimum number of bytes to read at at once. Making this a multiple 

165 of the FITS block size (2880) is recommended. 

166 partial 

167 Whether we will be reading only some of the archive, or if memory 

168 pressure forces us to read it only a little at a time. If `False` 

169 (default), the entire raw file may be read into memory up front. 

170 

171 Returns 

172 ------- 

173 `contextlib.AbstractContextManager` [`FitsInputArchive`] 

174 A context manager that returns a `FitsInputArchive` when entered. 

175 """ 

176 path = ResourcePath(path) 

177 stream: IO[bytes] 

178 if not partial: 

179 stream = io.BytesIO(path.read()) 

180 yield cls(stream) 

181 else: 

182 fs: fsspec.AbstractFileSystem 

183 fs, fp = path.to_fsspec() 

184 with fs.open(fp, block_size=page_size) as stream: 

185 yield cls(stream) 

186 

187 def get_tree[T: ArchiveTree](self, model_type: type[T]) -> T: 

188 """Read the JSON tree from the archive. 

189 

190 Parameters 

191 ---------- 

192 model_type 

193 A Pydantic model type to use to validate the JSON. 

194 

195 Returns 

196 ------- 

197 T 

198 The validated Pydantic model. 

199 """ 

200 json_bytes = self._readers[ExtensionKey(JSON_EXTNAME)].data[0][JSON_COLUMN].tobytes() 

201 return model_type.model_validate_json(json_bytes) 

202 

203 def deserialize_pointer[U: ArchiveTree, V]( 

204 self, 

205 pointer: PointerModel, 

206 model_type: type[U], 

207 deserializer: Callable[[U, InputArchive[PointerModel]], V], 

208 ) -> V: 

209 # Docstring inherited. 

210 if (cached := self._deserialized_pointer_cache.get(pointer.row)) is not None: 

211 return cached 

212 if not isinstance(pointer.column.data, ArrayReferenceModel): 

213 raise ArchiveReadError(f"Invalid pointer with inline array:\n{pointer.model_dump_json(indent=2)}") 

214 _, reader = self._get_source_reader(pointer.column.data.source, is_table=True) 

215 try: 

216 json_bytes = reader.data[pointer.row][JSON_COLUMN].tobytes() 

217 except Exception as err: 

218 raise InvalidFitsArchiveError( 

219 f"Failed to access the table cell referenced by {pointer.model_dump_json()}." 

220 ) from err 

221 result = deserializer(model_type.model_validate_json(json_bytes), self) 

222 self._deserialized_pointer_cache[pointer.row] = result 

223 return result 

224 

225 def get_frame_set(self, ref: PointerModel) -> FrameSet: 

226 try: 

227 result = self._deserialized_pointer_cache[ref.row] 

228 except KeyError: 

229 raise AssertionError( 

230 f"Frame set at {ref.model_dump_json(indent=2)} must be deserialized " 

231 "before any dependent transform can be." 

232 ) from None 

233 if not isinstance(result, FrameSet): 

234 raise InvalidFitsArchiveError(f"Expected a FrameSet instance at {ref.model_dump_json(indent=2)}.") 

235 return result 

236 

237 def get_array( 

238 self, 

239 model: ArrayReferenceModel | InlineArrayModel, 

240 *, 

241 slices: tuple[slice, ...] | EllipsisType = ..., 

242 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

243 ) -> np.ndarray: 

244 if not isinstance(model, ArrayReferenceModel): 

245 raise ArchiveReadError("Inline array found where a reference array was expected.") 

246 key, reader = self._get_source_reader(model.source, is_table=False) 

247 if slices is not ...: 

248 array = reader.section[slices] 

249 else: 

250 array = reader.data 

251 if key not in self._opaque_metadata.headers: 

252 opaque_header = reader.header.copy(strip=True) 

253 strip_header(opaque_header) 

254 self._opaque_metadata.add_header(opaque_header, key=key) 

255 return array 

256 

257 def get_table( 

258 self, 

259 model: TableModel, 

260 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

261 ) -> astropy.table.Table: 

262 # Docstring inherited. 

263 array = self.get_structured_array(model, strip_header) 

264 table = astropy.table.Table(array) 

265 for c in model.columns: 

266 c.update_table(table) 

267 return table 

268 

269 def get_structured_array( 

270 self, 

271 model: TableModel, 

272 strip_header: Callable[[astropy.io.fits.Header], None] = no_header_updates, 

273 ) -> np.ndarray: 

274 # Docstring inherited. 

275 if not isinstance(model.columns[0].data, ArrayReferenceModel): 

276 raise ArchiveReadError("Inline array found where a reference array was expected.") 

277 # All columns should have the same data.source; just use the first. 

278 key, reader = self._get_source_reader(model.columns[0].data.source, is_table=True) 

279 if key not in self._opaque_metadata.headers: 

280 opaque_header = reader.header.copy(strip=True) 

281 strip_header(opaque_header) 

282 self._opaque_metadata.add_header(opaque_header, key=key) 

283 return reader.hdu.data 

284 

285 def get_opaque_metadata(self) -> FitsOpaqueMetadata: 

286 # Docstring inherited. 

287 return self._opaque_metadata 

288 

289 def _get_source_reader(self, source: str | int, is_table: bool) -> tuple[ExtensionKey, _ExtensionReader]: 

290 """Get a reader for the extension referenced by a serialiation model's 

291 ``source`` field. 

292 

293 Parameters 

294 ---------- 

295 source 

296 A ``source`` field of the form ``fits:${hdu}`` or 

297 ``fits:${hdu}[${col}]``. 

298 is_table 

299 Whether the source should be for a table HDU. 

300 

301 Returns 

302 ------- 

303 key 

304 Identifier pair for the HDU (EXTNAME, EXTVER). 

305 reader 

306 A reader object for the extension. 

307 """ 

308 if not isinstance(source, str): 

309 raise InvalidFitsArchiveError(f"Reference with source={source!r} is not a string.") 

310 if not source.startswith("fits:"): 

311 raise InvalidFitsArchiveError(f"Reference with source={source!r} does not start with 'fits:'.") 

312 key = ExtensionKey.from_str(source) 

313 try: 

314 reader = self._readers[key] 

315 except KeyError: 

316 raise InvalidFitsArchiveError(f"Unrecognized source value {key}.") from None 

317 if is_table and not reader.is_table: 

318 raise InvalidFitsArchiveError( 

319 f"Extension with source={key} was expected to be be a binary table, not an image." 

320 ) 

321 elif not is_table and reader.is_table: 

322 raise InvalidFitsArchiveError( 

323 f"Extension with source={key} was expected to be be an image, not a binary table." 

324 ) 

325 return key, reader 

326 

327 

328class _ExtensionReader: 

329 """A lazy-load reader for a single extension HDU. 

330 

331 Parameters 

332 ---------- 

333 hdu_cls 

334 The type of the astropy HDU instance to construct. 

335 stream 

336 The file-like object to read from. 

337 """ 

338 

339 def __init__(self, hdu_cls: type[ExtensionHDU], stream: IO[bytes]): 

340 self._hdu_cls = hdu_cls 

341 self._stream = stream 

342 

343 @classmethod 

344 def from_index_row(cls, index_row: np.void, stream: IO[bytes]) -> _ExtensionReader: 

345 """Construct from a row of the binary table index HDU. 

346 

347 Parameters 

348 ---------- 

349 index_row 

350 A record array row from the index HDU. 

351 stream 

352 The file-like object being used to read the full FITS file. 

353 

354 Returns 

355 ------- 

356 reader 

357 A reader object for the extension. 

358 """ 

359 match index_row["XTENSION"].strip(): 

360 case "IMAGE": 

361 hdu_cls = astropy.io.fits.ImageHDU 

362 case "BINTABLE": 

363 if index_row["ZIMAGE"]: 

364 hdu_cls = astropy.io.fits.CompImageHDU 

365 else: 

366 hdu_cls = astropy.io.fits.BinTableHDU 

367 case other: 

368 raise AssertionError(f"Unsupported HDU type {other!r}.") 

369 return _ExtensionReader( 

370 hdu_cls, 

371 _RangeStreamProxy( 

372 stream, 

373 start=int(index_row["HDRADDR"]), 

374 ), 

375 ) 

376 

377 @classmethod 

378 def from_bytes(cls, hdu_cls: type[ExtensionHDU], data: bytes) -> _ExtensionReader: 

379 """Construct from already-read `bytes` 

380 

381 Parameters 

382 ---------- 

383 hdu_cls 

384 The HDU type to instantiate. 

385 data 

386 Raw data for the HDU. 

387 

388 Returns 

389 ------- 

390 reader 

391 A reader object for extension. 

392 """ 

393 return _ExtensionReader(hdu_cls, io.BytesIO(data)) 

394 

395 @property 

396 def is_table(self) -> bool: 

397 """Whether this is logically a table HDU. 

398 

399 This is `False` for compressed image HDUs, even though they are 

400 represented in FITS as a binary table. 

401 """ 

402 return issubclass(self._hdu_cls, astropy.io.fits.BinTableHDU) and not issubclass( 

403 self._hdu_cls, astropy.io.fits.CompImageHDU 

404 ) 

405 

406 @cached_property 

407 def hdu(self) -> ExtensionHDU: 

408 """The Astropy HDU object.""" 

409 self._stream.seek(0) 

410 if self._hdu_cls is astropy.io.fits.CompImageHDU: 

411 # CompImageHDU.readfrom doesn't work; we need to make a minimal 

412 # example and report it upstream. Happily this workaround does 

413 # work. 

414 bintable_hdu = astropy.io.fits.BinTableHDU.readfrom(self._stream, memmap=False, cache=False) 

415 return self._hdu_cls(bintable=bintable_hdu) 

416 else: 

417 return self._hdu_cls.readfrom(self._stream, memmap=False, cache=False, uint=True) 

418 

419 @property 

420 def header(self) -> astropy.io.fits.Header: 

421 """The header of the HDU.""" 

422 return self.hdu.header 

423 

424 @property 

425 def data(self) -> np.ndarray: 

426 """The data for the HDU.""" 

427 return self.hdu.data 

428 

429 @property 

430 def section(self) -> astropy.io.fits.Section | astropy.io.fits.CompImageSection: 

431 """An Astropy expression object that reads a subset of the data when 

432 sliced. 

433 """ 

434 return self.hdu.section 

435 

436 

437class _RangeStreamProxy(IO[bytes]): 

438 """A readable IO proxy object that makes the beginning of the file appear 

439 at a custom position. 

440 

441 Parameters 

442 ---------- 

443 base 

444 Underlying readable, seekable buffer to proxy. 

445 start 

446 Offset into the base stream that will be considered the start of the 

447 proxy stream. 

448 

449 Notes 

450 ----- 

451 This class exists because Astropy doesn't seem to provide a way to read a 

452 single HDU that starts at the current seek position of a file-like object. 

453 It does provide ``readfrom`` methods on its HDU objects that take a 

454 file-like object, but these assume (possibly unintentionally; it only 

455 happens when Astropy is trying to see whether the file was opened for 

456 appending) that ``seek(0)`` will set the file-like object to the start of 

457 the HDU. 

458 """ 

459 

460 def __init__(self, base: IO[bytes], start: int): 

461 self._base = base 

462 self._start = start 

463 

464 @property 

465 def mode(self) -> str: 

466 return "rb" 

467 

468 def __enter__(self) -> Self: 

469 raise AssertionError("This proxy should not be used as a context manager.") 

470 

471 def __exit__(self, type: Any, value: Any, traceback: Any) -> None: 

472 raise AssertionError("This proxy should not be used as a context manager.") 

473 

474 def __iter__(self) -> Iterator[bytes]: 

475 return self._base.__iter__() 

476 

477 def __next__(self) -> bytes: 

478 return self._base.__next__() 

479 

480 def close(self) -> None: 

481 raise AssertionError("This proxy should not ever be closed.") 

482 

483 @property 

484 def closed(self) -> bool: 

485 return False 

486 

487 def fileno(self) -> int: 

488 raise OSError() 

489 

490 def flush(self) -> None: 

491 pass 

492 

493 def isatty(self) -> bool: 

494 return False 

495 

496 def read(self, n: int = -1, /) -> bytes: 

497 result = self._base.read(n) 

498 return result 

499 

500 def readable(self) -> bool: 

501 return True 

502 

503 def readline(self, limit: int = -1, /) -> bytes: 

504 return self._base.readline(limit) 

505 

506 def readlines(self, hint: int = -1, /) -> list[bytes]: 

507 return self._base.readlines(hint) 

508 

509 def seek(self, offset: int, whence: int = 0) -> int: 

510 match whence: 

511 case os.SEEK_SET: 

512 return self._base.seek(offset + self._start, os.SEEK_SET) - self._start 

513 case os.SEEK_CUR: 

514 return self._base.seek(offset, os.SEEK_CUR) - self._start 

515 case os.SEEK_END: 

516 return self._base.seek(offset, os.SEEK_END) - self._start 

517 raise TypeError(f"Invalid value for 'whence': {whence}.") 

518 

519 def seekable(self) -> bool: 

520 return True 

521 

522 def tell(self) -> int: 

523 return self._base.tell() - self._start 

524 

525 def truncate(self, size: int | None = None, /) -> int: 

526 raise OSError() 

527 

528 def writable(self) -> bool: 

529 return False 

530 

531 def write(self, arg: Any, /) -> int: 

532 raise OSError() 

533 

534 def writelines(self, arg: Any, /) -> None: 

535 raise OSError()