Coverage for python / lsst / pipe / base / quantum_graph / _multiblock.py: 38%

297 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-06 08:32 +0000

1# This file is part of pipe_base. 

2# 

3# Developed for the LSST Data Management System. 

4# This product includes software developed by the LSST Project 

5# (http://www.lsst.org). 

6# See the COPYRIGHT file at the top-level directory of this distribution 

7# for details of code ownership. 

8# 

9# This software is dual licensed under the GNU General Public License and also 

10# under a 3-clause BSD license. Recipients may choose which of these licenses 

11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt, 

12# respectively. If you choose the GPL option then the following text applies 

13# (but note that there is still no warranty even if you opt for BSD instead): 

14# 

15# This program is free software: you can redistribute it and/or modify 

16# it under the terms of the GNU General Public License as published by 

17# the Free Software Foundation, either version 3 of the License, or 

18# (at your option) any later version. 

19# 

20# This program is distributed in the hope that it will be useful, 

21# but WITHOUT ANY WARRANTY; without even the implied warranty of 

22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23# GNU General Public License for more details. 

24# 

25# You should have received a copy of the GNU General Public License 

26# along with this program. If not, see <http://www.gnu.org/licenses/>. 

27 

28from __future__ import annotations 

29 

30__all__ = ( 

31 "Address", 

32 "AddressReader", 

33 "AddressRow", 

34 "AddressWriter", 

35 "Compressor", 

36 "Decompressor", 

37 "InvalidQuantumGraphFileError", 

38 "MultiblockReader", 

39 "MultiblockWriter", 

40) 

41 

42import dataclasses 

43import logging 

44import tempfile 

45import uuid 

46import zipfile 

47from collections.abc import Iterator, Set 

48from contextlib import contextmanager 

49from io import BufferedReader, BytesIO 

50from operator import attrgetter 

51from typing import IO, Protocol, TypeVar 

52 

53import pydantic 

54 

55_LOG = logging.getLogger(__name__) 

56 

57 

58_T = TypeVar("_T", bound=pydantic.BaseModel) 

59 

60 

61type UUID_int = int 

62 

63MAX_UUID_INT: UUID_int = 2**128 

64 

65 

66DEFAULT_PAGE_SIZE: int = 5_000_000 

67"""Default page size for reading chunks of quantum graph files. 

68 

69This is intended to be large enough to avoid any possibility of individual 

70reads suffering from per-seek overheads, especially in network file access, 

71while still being small enough to only minimally slow down tiny reads of 

72individual quanta (especially for execution). 

73""" 

74 

75 

76class Compressor(Protocol): 

77 """A protocol for objects with a ``compress`` method that takes and returns 

78 `bytes`. 

79 """ 

80 

81 def compress(self, data: bytes) -> bytes: 

82 """Compress the given data. 

83 

84 Parameters 

85 ---------- 

86 data : `bytes` 

87 Uncompressed data. 

88 

89 Returns 

90 ------- 

91 compressed : `bytes` 

92 Compressed data. 

93 """ 

94 ... 

95 

96 

97class Decompressor(Protocol): 

98 """A protocol for objects with a `decompress` method that takes and returns 

99 `bytes`. 

100 """ 

101 

102 def decompress(self, data: bytes) -> bytes: 

103 """Decompress the given data. 

104 

105 Parameters 

106 ---------- 

107 data : `bytes` 

108 Compressed data. 

109 

110 Returns 

111 ------- 

112 decompressed : `bytes` 

113 Uncompressed data. 

114 """ 

115 ... 

116 

117 

118class InvalidQuantumGraphFileError(RuntimeError): 

119 """An exception raised when a quantum graph file has internal 

120 inconsistencies or does not actually appear to be a quantum graph file. 

121 """ 

122 

123 

124@dataclasses.dataclass(slots=True) 

125class Address: 

126 """Struct that holds an address into a multi-block file.""" 

127 

128 offset: int = 0 

129 """Byte offset for the block.""" 

130 

131 size: int = 0 

132 """Size of the block. 

133 

134 This always includes the size of the tiny header that records the block 

135 size. That header does not include the size of the header, so these sizes 

136 differ by the ``int_size`` used to write the multi-block file. 

137 

138 A size of zero is used (with, by convention, an offset of zero) to indicate 

139 an absent block. 

140 """ 

141 

142 def __str__(self) -> str: 

143 return f"{self.offset:06}[{self.size:06}]" 

144 

145 

146@dataclasses.dataclass(slots=True) 

147class AddressRow: 

148 """The in-memory representation of a single row in an address file.""" 

149 

150 key: uuid.UUID 

151 """Universally unique identifier for this row.""" 

152 

153 index: int 

154 """Monotonically increasing integer ID; unique within this file only.""" 

155 

156 addresses: list[Address] = dataclasses.field(default_factory=list) 

157 """Offsets and sizes into multi-block files.""" 

158 

159 def write(self, stream: IO[bytes], int_size: int) -> None: 

160 """Write this address row to a file-like object. 

161 

162 Parameters 

163 ---------- 

164 stream : `typing.IO` [ `bytes` ] 

165 Binary file-like object. 

166 int_size : `int` 

167 Number of bytes to use for all integers. 

168 """ 

169 stream.write(self.key.bytes) 

170 stream.write(self.index.to_bytes(int_size)) 

171 for address in self.addresses: 

172 stream.write(address.offset.to_bytes(int_size)) 

173 stream.write(address.size.to_bytes(int_size)) 

174 

175 @classmethod 

176 def read(cls, stream: IO[bytes], n_addresses: int, int_size: int) -> AddressRow: 

177 """Read this address row from a file-like object. 

178 

179 Parameters 

180 ---------- 

181 stream : `typing.IO` [ `bytes` ] 

182 Binary file-like object. 

183 n_addresses : `int` 

184 Number of addresses included in each row. 

185 int_size : `int` 

186 Number of bytes to use for all integers. 

187 """ 

188 key = uuid.UUID(int=int.from_bytes(stream.read(16))) 

189 index = int.from_bytes(stream.read(int_size)) 

190 row = AddressRow(key, index) 

191 for _ in range(n_addresses): 

192 offset = int.from_bytes(stream.read(int_size)) 

193 size = int.from_bytes(stream.read(int_size)) 

194 row.addresses.append(Address(offset, size)) 

195 return row 

196 

197 def __str__(self) -> str: 

198 return f"{self.key} {self.index:06} {' '.join(str(a) for a in self.addresses)}" 

199 

200 

201@dataclasses.dataclass 

202class AddressWriter: 

203 """A helper object for writing address files for multi-block files.""" 

204 

205 addresses: list[dict[uuid.UUID, Address]] = dataclasses.field(default_factory=list) 

206 """Addresses to store with each UUID. 

207 

208 Every key in one of these dictionaries must have an entry in ``indices``. 

209 The converse is not true. 

210 """ 

211 

212 def write(self, stream: IO[bytes], int_size: int, all_ids: Set[uuid.UUID] | None = None) -> None: 

213 """Write all addresses to a file-like object. 

214 

215 Parameters 

216 ---------- 

217 stream : `typing.IO` [ `bytes` ] 

218 Binary file-like object. 

219 int_size : `int` 

220 Number of bytes to use for all integers. 

221 all_ids : `~collections.abc.Set` [`uuid.UUID`], optional 

222 Set of the union of all UUIDs in any dictionary from a call to 

223 `get_all_ids`. 

224 """ 

225 if all_ids is None: 

226 all_ids = self.get_all_ids() 

227 stream.write(int_size.to_bytes(1)) 

228 stream.write(len(all_ids).to_bytes(int_size)) 

229 stream.write(len(self.addresses).to_bytes(int_size)) 

230 empty_address = Address() 

231 for n, key in enumerate(sorted(all_ids, key=attrgetter("int"))): 

232 row = AddressRow(key, n, [m.get(key, empty_address) for m in self.addresses]) 

233 _LOG.debug("Wrote address %s.", row) 

234 row.write(stream, int_size) 

235 

236 def write_to_zip(self, zf: zipfile.ZipFile, name: str, int_size: int) -> None: 

237 """Write all addresses to a file in a zip archive. 

238 

239 Parameters 

240 ---------- 

241 zf : `zipfile.ZipFile` 

242 Zip archive to add the file to. 

243 name : `str` 

244 Base name for the address file; an extension will be added. 

245 int_size : `int` 

246 Number of bytes to use for all integers. 

247 """ 

248 all_ids = self.get_all_ids() 

249 zip_info = zipfile.ZipInfo(f"{name}.addr") 

250 row_size = AddressReader.compute_row_size(int_size, len(self.addresses)) 

251 zip_info.file_size = AddressReader.compute_header_size(int_size) + len(all_ids) * row_size 

252 with zf.open(zip_info, mode="w") as stream: 

253 self.write(stream, int_size=int_size, all_ids=all_ids) 

254 

255 def get_all_ids(self) -> Set[uuid.UUID]: 

256 """Return all IDs used by any address dictionary. 

257 

258 Returns 

259 ------- 

260 all_ids : `~collections.abc.Set` [`uuid.UUID`] 

261 Set of all IDs. 

262 """ 

263 all_ids: set[uuid.UUID] = set() 

264 for address_map in self.addresses: 

265 all_ids.update(address_map.keys()) 

266 return all_ids 

267 

268 

269@dataclasses.dataclass 

270class AddressPage: 

271 """A page of addresses in the `AddressReader`.""" 

272 

273 file_offset: int 

274 """Offset in bytes to this page from the beginning of the file.""" 

275 

276 begin: int 

277 """Index of the first row in this page.""" 

278 

279 n_rows: int 

280 """Number of rows in this page.""" 

281 

282 read: bool = False 

283 """Whether this page has already been read.""" 

284 

285 @property 

286 def end(self) -> int: 

287 """One past the last row index in this page.""" 

288 return self.begin + self.n_rows 

289 

290 

291@dataclasses.dataclass 

292class PageBounds: 

293 """A page index and the UUID interval that page covers.""" 

294 

295 page_index: int 

296 """Index into the page array.""" 

297 

298 uuid_int_begin: UUID_int 

299 """Integer representation of the smallest UUID in this page.""" 

300 

301 uuid_int_end: UUID_int 

302 """One larger than the integer representation of the largest UUID in this 

303 page. 

304 """ 

305 

306 def __str__(self) -> str: 

307 return f"{self.page_index} [{self.uuid_int_begin:x}:{self.uuid_int_end:x}]" 

308 

309 

310@dataclasses.dataclass 

311class AddressReader: 

312 """A helper object for reading address files for multi-block files.""" 

313 

314 stream: IO[bytes] 

315 """Stream to read from.""" 

316 

317 int_size: int 

318 """Size of each integer in bytes.""" 

319 

320 n_rows: int 

321 """Number of rows in the file.""" 

322 

323 n_addresses: int 

324 """Number of addresses in each row.""" 

325 

326 rows_per_page: int 

327 """Number of addresses in each page.""" 

328 

329 rows: dict[uuid.UUID, AddressRow] = dataclasses.field(default_factory=dict) 

330 """Rows that have already been read.""" 

331 

332 pages: list[AddressPage] = dataclasses.field(default_factory=list) 

333 """Descriptions of the file offsets and integer row indexes of pages and 

334 flags for whether they have been read already. 

335 """ 

336 

337 page_bounds: dict[int, PageBounds] = dataclasses.field(default_factory=dict) 

338 """Mapping from page index to page boundary information.""" 

339 

340 @classmethod 

341 def from_stream( 

342 cls, stream: IO[bytes], *, page_size: int, n_addresses: int, int_size: int 

343 ) -> AddressReader: 

344 """Construct from a stream by reading the header. 

345 

346 Parameters 

347 ---------- 

348 stream : `typing.IO` [ `bytes` ] 

349 File-like object to read from. 

350 page_size : `int` 

351 Approximate number of bytes to read at a time when searching for an 

352 address. 

353 n_addresses : `int` 

354 Number of addresses to expect per row. This is checked against 

355 the size embedded in the file. 

356 int_size : `int` 

357 Number of bytes to use for all integers. This is checked against 

358 the size embedded in the file. 

359 """ 

360 header_size = cls.compute_header_size(int_size) 

361 row_size = cls.compute_row_size(int_size, n_addresses) 

362 # Read the raw header page. 

363 header_page_data = stream.read(header_size) 

364 if len(header_page_data) < header_size: 

365 raise InvalidQuantumGraphFileError("Address file unexpectedly truncated.") 

366 # Interpret the raw header data and initialize the reader instance. 

367 header_page_stream = BytesIO(header_page_data) 

368 file_int_size = int.from_bytes(header_page_stream.read(1)) 

369 if file_int_size != int_size: 

370 raise InvalidQuantumGraphFileError( 

371 f"int size in address file ({file_int_size}) does not match int size in header ({int_size})." 

372 ) 

373 n_rows = int.from_bytes(header_page_stream.read(int_size)) 

374 file_n_addresses = int.from_bytes(header_page_stream.read(int_size)) 

375 if file_n_addresses != n_addresses: 

376 raise InvalidQuantumGraphFileError( 

377 f"Incorrect number of addresses per row: expected {n_addresses}, got {file_n_addresses}." 

378 ) 

379 rows_per_page = max(page_size // row_size, 1) 

380 # Construct an instance. 

381 self = cls(stream, int_size, n_rows, n_addresses, rows_per_page=rows_per_page) 

382 # Calculate positions of each page of rows. 

383 row_index = 0 

384 file_offset = header_size 

385 while row_index < n_rows: 

386 self.pages.append(AddressPage(file_offset=file_offset, begin=row_index, n_rows=rows_per_page)) 

387 row_index += rows_per_page 

388 file_offset += rows_per_page * row_size 

389 if row_index != n_rows: 

390 # Last page was too big. 

391 self.pages[-1].n_rows -= row_index - n_rows 

392 assert sum(p.n_rows for p in self.pages) == n_rows, "Bad logic setting page row counts." 

393 return self 

394 

395 @classmethod 

396 @contextmanager 

397 def open_in_zip( 

398 cls, 

399 zf: zipfile.ZipFile, 

400 name: str, 

401 *, 

402 page_size: int, 

403 n_addresses: int, 

404 int_size: int, 

405 ) -> Iterator[AddressReader]: 

406 """Make a reader for an address file in a zip archive. 

407 

408 Parameters 

409 ---------- 

410 zf : `zipfile.ZipFile` 

411 Zip archive to read the file from. 

412 name : `str` 

413 Base name for the address file; an extension will be added. 

414 page_size : `int` 

415 Approximate number of bytes to read at a time when searching for an 

416 address. 

417 n_addresses : `int` 

418 Number of addresses to expect per row. This is checked against 

419 the size embedded in the file. 

420 int_size : `int` 

421 Number of bytes to use for all integers. This is checked against 

422 the size embedded in the file. 

423 

424 Returns 

425 ------- 

426 reader : `contextlib.AbstractContextManager` [ `AddressReader` ] 

427 Context manager that returns a reader when entered. 

428 """ 

429 with zf.open(f"{name}.addr", mode="r") as stream: 

430 yield cls.from_stream(stream, page_size=page_size, n_addresses=n_addresses, int_size=int_size) 

431 

432 @staticmethod 

433 def compute_header_size(int_size: int) -> int: 

434 """Return the size (in bytes) of the header of an address file. 

435 

436 Parameters 

437 ---------- 

438 int_size : `int` 

439 Size of each integer in bytes. 

440 

441 Returns 

442 ------- 

443 size : `int` 

444 Size of the header in bytes. 

445 """ 

446 return ( 

447 1 # int_size 

448 + int_size # number of rows 

449 + int_size # number of addresses in each row 

450 ) 

451 

452 @staticmethod 

453 def compute_row_size(int_size: int, n_addresses: int) -> int: 

454 """Return the size (in bytes) of each row of an address file. 

455 

456 Parameters 

457 ---------- 

458 int_size : `int` 

459 Size of each integer in bytes. 

460 n_addresses : `int` 

461 Number of addresses in each row. 

462 

463 Returns 

464 ------- 

465 size : `int` 

466 Size of each row in bytes. 

467 """ 

468 return ( 

469 16 # uuid 

470 + int_size 

471 * ( 

472 1 # index 

473 + 2 * n_addresses 

474 ) 

475 ) 

476 

477 @property 

478 def row_size(self) -> int: 

479 """The size (in bytes) of each row of this address file.""" 

480 return self.compute_row_size(self.int_size, self.n_addresses) 

481 

482 def read_all(self) -> dict[uuid.UUID, AddressRow]: 

483 """Read all addresses in the file. 

484 

485 Returns 

486 ------- 

487 rows : `dict` [ `uuid.UUID`, `AddressRow` ] 

488 Mapping of loaded address rows, keyed by UUID. 

489 """ 

490 # Skip any pages from the beginning that have already been read; this 

491 # nicely handles both the case where we already read everything (or 

492 # there was nothing to read) while giving us a page with a file offset 

493 # to start from. 

494 for page in self.pages: 

495 if not page.read: 

496 break 

497 else: 

498 return self.rows 

499 # Read the entire rest of the file into memory. 

500 self.stream.seek(page.file_offset) 

501 data = self.stream.read() 

502 buffer = BytesIO(data) 

503 # Shortcut out if we've already read everything, but don't bother 

504 # optimizing previous partial reads. 

505 while len(self.rows) < self.n_rows: 

506 self._read_row(buffer) 

507 # Delete all pages; they don't matter anymore, and that's easier than 

508 # updating them to reflect the reads we've done. 

509 self.pages.clear() 

510 return self.rows 

511 

512 def find(self, key: uuid.UUID) -> AddressRow: 

513 """Read the row for the given UUID or integer index. 

514 

515 Parameters 

516 ---------- 

517 key : `uuid.UUID` 

518 UUID to find. 

519 

520 Returns 

521 ------- 

522 row : `AddressRow` 

523 Addresses for the given UUID. 

524 """ 

525 if (row := self.rows.get(key)) is not None: 

526 return row 

527 if self.n_rows == 0 or not self.pages: 

528 raise LookupError(f"Address for {key} not found.") 

529 

530 # Use a binary search to find the page containing the target UUID. 

531 left = 0 

532 right = len(self.pages) - 1 

533 while left <= right: 

534 mid = left + ((right - left) // 2) 

535 self._read_page(mid) 

536 if (row := self.rows.get(key)) is not None: 

537 return row 

538 bounds = self.page_bounds[mid] 

539 if key.int < bounds.uuid_int_begin: 

540 right = mid - 1 

541 elif key.int > bounds.uuid_int_end: 

542 left = mid + 1 

543 else: 

544 # Should have been on this page, but it wasn't. 

545 raise LookupError(f"Address for {key} not found.") 

546 

547 # Ran out of pages to search. 

548 raise LookupError(f"Address for {key} not found.") 

549 

550 def _read_page(self, page_index: int, page_stream: BytesIO | None = None) -> bool: 

551 page = self.pages[page_index] 

552 if page.read: 

553 return False 

554 if page_stream is None: 

555 self.stream.seek(page.file_offset) 

556 page_stream = BytesIO(self.stream.read(page.n_rows * self.row_size)) 

557 row = self._read_row(page_stream) 

558 uuid_int_begin = row.key.int 

559 for _ in range(1, page.n_rows): 

560 row = self._read_row(page_stream) 

561 uuid_int_end = row.key.int + 1 # Python's loop scoping rules are actually useful here! 

562 page.read = True 

563 bounds = PageBounds(page_index=page_index, uuid_int_begin=uuid_int_begin, uuid_int_end=uuid_int_end) 

564 self.page_bounds[page_index] = bounds 

565 _LOG.debug("Read page %s with rows [%s:%s].", bounds, page.begin, page.end) 

566 return True 

567 

568 def _read_row(self, page_stream: BytesIO) -> AddressRow: 

569 row = AddressRow.read(page_stream, self.n_addresses, self.int_size) 

570 self.rows[row.key] = row 

571 _LOG.debug("Read address row %s.", row) 

572 return row 

573 

574 

575@dataclasses.dataclass 

576class MultiblockWriter: 

577 """A helper object for writing multi-block files.""" 

578 

579 stream: IO[bytes] 

580 """A binary file-like object to write to.""" 

581 

582 int_size: int 

583 """Number of bytes to use for all integers.""" 

584 

585 file_size: int = 0 

586 """Running size of the full file.""" 

587 

588 addresses: dict[uuid.UUID, Address] = dataclasses.field(default_factory=dict) 

589 """Running map of all addresses added to the file so far. 

590 

591 When the multi-block file is fully written, this is appended to the 

592 `AddressWriter.addresses` to write the corresponding address file. 

593 """ 

594 

595 @classmethod 

596 @contextmanager 

597 def open_in_zip( 

598 cls, zf: zipfile.ZipFile, name: str, int_size: int, use_tempfile: bool = False 

599 ) -> Iterator[MultiblockWriter]: 

600 """Open a writer for a file in a zip archive. 

601 

602 Parameters 

603 ---------- 

604 zf : `zipfile.ZipFile` 

605 Zip archive to add the file to. 

606 name : `str` 

607 Base name for the multi-block file; an extension will be added. 

608 int_size : `int` 

609 Number of bytes to use for all integers. 

610 use_tempfile : `bool`, optional 

611 If `True`, send writes to a temporary file and only add the file to 

612 the zip archive when the context manager closes. This involves 

613 more overall I/O, but it permits multiple multi-block files to be 

614 open for writing in the same zip archive at once. 

615 

616 Returns 

617 ------- 

618 writer : `contextlib.AbstractContextManager` [ `MultiblockWriter` ] 

619 Context manager that returns a writer when entered. 

620 """ 

621 filename = f"{name}.mb" 

622 if use_tempfile: 

623 with tempfile.NamedTemporaryFile(suffix=filename) as tmp: 

624 yield MultiblockWriter(tmp, int_size) 

625 tmp.flush() 

626 zf.write(tmp.name, filename) 

627 else: 

628 with zf.open(f"{name}.mb", mode="w", force_zip64=True) as stream: 

629 yield MultiblockWriter(stream, int_size) 

630 

631 def write_bytes(self, id: uuid.UUID, data: bytes) -> Address: 

632 """Write raw bytes to the multi-block file. 

633 

634 Parameters 

635 ---------- 

636 id : `uuid.UUID` 

637 Unique ID of the object described by this block. 

638 data : `bytes` 

639 Data to store directly. 

640 

641 Returns 

642 ------- 

643 address : `Address` 

644 Address of the bytes just written. 

645 """ 

646 assert id not in self.addresses, "Duplicate write to multi-block file detected." 

647 self.stream.write(len(data).to_bytes(self.int_size)) 

648 self.stream.write(data) 

649 block_size = len(data) + self.int_size 

650 address = Address(offset=self.file_size, size=block_size) 

651 self.file_size += block_size 

652 self.addresses[id] = address 

653 return address 

654 

655 def write_model(self, id: uuid.UUID, model: pydantic.BaseModel, compressor: Compressor) -> Address: 

656 """Write raw bytes to the multi-block file. 

657 

658 Parameters 

659 ---------- 

660 id : `uuid.UUID` 

661 Unique ID of the object described by this block. 

662 model : `pydantic.BaseModel` 

663 Model to convert to JSON and compress. 

664 compressor : `Compressor` 

665 Object with a ``compress`` method that takes and returns `bytes`. 

666 

667 Returns 

668 ------- 

669 address : `Address` 

670 Address of the bytes just written. 

671 """ 

672 json_data = model.model_dump_json().encode() 

673 compressed_data = compressor.compress(json_data) 

674 return self.write_bytes(id, compressed_data) 

675 

676 

677@dataclasses.dataclass 

678class MultiblockReader: 

679 """A helper object for reader multi-block files.""" 

680 

681 stream: IO[bytes] 

682 """A binary file-like object to read from.""" 

683 

684 int_size: int 

685 """Number of bytes to use for all integers.""" 

686 

687 @classmethod 

688 @contextmanager 

689 def open_in_zip(cls, zf: zipfile.ZipFile, name: str, *, int_size: int) -> Iterator[MultiblockReader]: 

690 """Open a reader for a file in a zip archive. 

691 

692 Parameters 

693 ---------- 

694 zf : `zipfile.ZipFile` 

695 Zip archive to read the file from. 

696 name : `str` 

697 Base name for the multi-block file; an extension will be added. 

698 int_size : `int` 

699 Number of bytes to use for all integers. 

700 

701 Returns 

702 ------- 

703 reader : `contextlib.AbstractContextManager` [ `MultiblockReader` ] 

704 Context manager that returns a reader when entered. 

705 """ 

706 with zf.open(f"{name}.mb", mode="r") as stream: 

707 yield MultiblockReader(stream, int_size) 

708 

709 @classmethod 

710 def read_all_bytes_in_zip( 

711 cls, zf: zipfile.ZipFile, name: str, *, int_size: int, page_size: int 

712 ) -> Iterator[bytes]: 

713 """Iterate over all of the byte blocks in a file in a zip archive. 

714 

715 Parameters 

716 ---------- 

717 zf : `zipfile.ZipFile` 

718 Zip archive to read the file from. 

719 name : `str` 

720 Base name for the multi-block file; an extension will be added. 

721 int_size : `int` 

722 Number of bytes to use for all integers. 

723 page_size : `int` 

724 Approximate number of bytes to read at a time. 

725 

726 Returns 

727 ------- 

728 byte_iter : `~collections.abc.Iterator` [ `bytes` ] 

729 Iterator over blocks. 

730 """ 

731 with zf.open(f"{name}.mb", mode="r") as zf_stream: 

732 # The standard library typing of IO[bytes] tiers isn't consistent. 

733 buffered_stream = BufferedReader(zf_stream) # type: ignore[type-var] 

734 size_data = buffered_stream.read(int_size) 

735 while size_data: 

736 internal_size = int.from_bytes(size_data) 

737 yield buffered_stream.read(internal_size) 

738 size_data = buffered_stream.read(int_size) 

739 

740 @classmethod 

741 def read_all_models_in_zip( 

742 cls, 

743 zf: zipfile.ZipFile, 

744 name: str, 

745 model_type: type[_T], 

746 decompressor: Decompressor, 

747 *, 

748 int_size: int, 

749 page_size: int, 

750 ) -> Iterator[_T]: 

751 """Iterate over all of the models in a file in a zip archive. 

752 

753 Parameters 

754 ---------- 

755 zf : `zipfile.ZipFile` 

756 Zip archive to read the file from. 

757 name : `str` 

758 Base name for the multi-block file; an extension will be added. 

759 model_type : `type` [ `pydantic.BaseModel` ] 

760 Pydantic model to validate JSON with. 

761 decompressor : `Decompressor` 

762 Object with a ``decompress`` method that takes and returns `bytes`. 

763 int_size : `int` 

764 Number of bytes to use for all integers. 

765 page_size : `int` 

766 Approximate number of bytes to read at a time. 

767 

768 Returns 

769 ------- 

770 model_iter : `~collections.abc.Iterator` [ `pydantic.BaseModel` ] 

771 Iterator over model instances. 

772 """ 

773 for compressed_data in cls.read_all_bytes_in_zip(zf, name, int_size=int_size, page_size=page_size): 

774 json_data = decompressor.decompress(compressed_data) 

775 yield model_type.model_validate_json(json_data) 

776 

777 def read_bytes(self, address: Address) -> bytes | None: 

778 """Read raw bytes from the multi-block file. 

779 

780 Parameters 

781 ---------- 

782 address : `Address` 

783 Offset and size of the data to read. 

784 

785 Returns 

786 ------- 

787 data : `bytes` or `None` 

788 Data read directly, or `None` if the address has zero size. 

789 """ 

790 if not address.size: 

791 return None 

792 self.stream.seek(address.offset) 

793 data = self.stream.read(address.size) 

794 internal_size = int.from_bytes(data[: self.int_size]) 

795 data = data[self.int_size :] 

796 if len(data) != internal_size: 

797 raise InvalidQuantumGraphFileError( 

798 f"Internal size {internal_size} does not match loaded data size {len(data)}." 

799 ) 

800 return data 

801 

802 def read_model(self, address: Address, model_type: type[_T], decompressor: Decompressor) -> _T | None: 

803 """Read a single compressed JSON block. 

804 

805 Parameters 

806 ---------- 

807 address : `Address` 

808 Size and offset of the block. 

809 model_type : `type` [ `pydantic.BaseModel` ] 

810 Pydantic model to validate JSON with. 

811 decompressor : `Decompressor` 

812 Object with a ``decompress`` method that takes and returns `bytes`. 

813 

814 Returns 

815 ------- 

816 model : `pydantic.BaseModel` 

817 Validated model. 

818 """ 

819 compressed_data = self.read_bytes(address) 

820 if compressed_data is None: 

821 return None 

822 json_data = decompressor.decompress(compressed_data) 

823 return model_type.model_validate_json(json_data)