Coverage for python / lsst / pipe / base / quantum_graph / _multiblock.py: 38%
297 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-24 08:19 +0000
1# This file is part of pipe_base.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (http://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This software is dual licensed under the GNU General Public License and also
10# under a 3-clause BSD license. Recipients may choose which of these licenses
11# to use; please see the files gpl-3.0.txt and/or bsd_license.txt,
12# respectively. If you choose the GPL option then the following text applies
13# (but note that there is still no warranty even if you opt for BSD instead):
14#
15# This program is free software: you can redistribute it and/or modify
16# it under the terms of the GNU General Public License as published by
17# the Free Software Foundation, either version 3 of the License, or
18# (at your option) any later version.
19#
20# This program is distributed in the hope that it will be useful,
21# but WITHOUT ANY WARRANTY; without even the implied warranty of
22# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23# GNU General Public License for more details.
24#
25# You should have received a copy of the GNU General Public License
26# along with this program. If not, see <http://www.gnu.org/licenses/>.
28from __future__ import annotations
30__all__ = (
31 "Address",
32 "AddressReader",
33 "AddressRow",
34 "AddressWriter",
35 "Compressor",
36 "Decompressor",
37 "InvalidQuantumGraphFileError",
38 "MultiblockReader",
39 "MultiblockWriter",
40)
42import dataclasses
43import logging
44import tempfile
45import uuid
46import zipfile
47from collections.abc import Iterator, Set
48from contextlib import contextmanager
49from io import BufferedReader, BytesIO
50from operator import attrgetter
51from typing import IO, Protocol, TypeVar
53import pydantic
55_LOG = logging.getLogger(__name__)
58_T = TypeVar("_T", bound=pydantic.BaseModel)
61type UUID_int = int
63MAX_UUID_INT: UUID_int = 2**128
66DEFAULT_PAGE_SIZE: int = 5_000_000
67"""Default page size for reading chunks of quantum graph files.
69This is intended to be large enough to avoid any possibility of individual
70reads suffering from per-seek overheads, especially in network file access,
71while still being small enough to only minimally slow down tiny reads of
72individual quanta (especially for execution).
73"""
76class Compressor(Protocol):
77 """A protocol for objects with a ``compress`` method that takes and returns
78 `bytes`.
79 """
81 def compress(self, data: bytes) -> bytes:
82 """Compress the given data.
84 Parameters
85 ----------
86 data : `bytes`
87 Uncompressed data.
89 Returns
90 -------
91 compressed : `bytes`
92 Compressed data.
93 """
94 ...
97class Decompressor(Protocol):
98 """A protocol for objects with a `decompress` method that takes and returns
99 `bytes`.
100 """
102 def decompress(self, data: bytes) -> bytes:
103 """Decompress the given data.
105 Parameters
106 ----------
107 data : `bytes`
108 Compressed data.
110 Returns
111 -------
112 decompressed : `bytes`
113 Uncompressed data.
114 """
115 ...
118class InvalidQuantumGraphFileError(RuntimeError):
119 """An exception raised when a quantum graph file has internal
120 inconsistencies or does not actually appear to be a quantum graph file.
121 """
124@dataclasses.dataclass(slots=True)
125class Address:
126 """Struct that holds an address into a multi-block file."""
128 offset: int = 0
129 """Byte offset for the block."""
131 size: int = 0
132 """Size of the block.
134 This always includes the size of the tiny header that records the block
135 size. That header does not include the size of the header, so these sizes
136 differ by the ``int_size`` used to write the multi-block file.
138 A size of zero is used (with, by convention, an offset of zero) to indicate
139 an absent block.
140 """
142 def __str__(self) -> str:
143 return f"{self.offset:06}[{self.size:06}]"
146@dataclasses.dataclass(slots=True)
147class AddressRow:
148 """The in-memory representation of a single row in an address file."""
150 key: uuid.UUID
151 """Universally unique identifier for this row."""
153 index: int
154 """Monotonically increasing integer ID; unique within this file only."""
156 addresses: list[Address] = dataclasses.field(default_factory=list)
157 """Offsets and sizes into multi-block files."""
159 def write(self, stream: IO[bytes], int_size: int) -> None:
160 """Write this address row to a file-like object.
162 Parameters
163 ----------
164 stream : `typing.IO` [ `bytes` ]
165 Binary file-like object.
166 int_size : `int`
167 Number of bytes to use for all integers.
168 """
169 stream.write(self.key.bytes)
170 stream.write(self.index.to_bytes(int_size))
171 for address in self.addresses:
172 stream.write(address.offset.to_bytes(int_size))
173 stream.write(address.size.to_bytes(int_size))
175 @classmethod
176 def read(cls, stream: IO[bytes], n_addresses: int, int_size: int) -> AddressRow:
177 """Read this address row from a file-like object.
179 Parameters
180 ----------
181 stream : `typing.IO` [ `bytes` ]
182 Binary file-like object.
183 n_addresses : `int`
184 Number of addresses included in each row.
185 int_size : `int`
186 Number of bytes to use for all integers.
187 """
188 key = uuid.UUID(int=int.from_bytes(stream.read(16)))
189 index = int.from_bytes(stream.read(int_size))
190 row = AddressRow(key, index)
191 for _ in range(n_addresses):
192 offset = int.from_bytes(stream.read(int_size))
193 size = int.from_bytes(stream.read(int_size))
194 row.addresses.append(Address(offset, size))
195 return row
197 def __str__(self) -> str:
198 return f"{self.key} {self.index:06} {' '.join(str(a) for a in self.addresses)}"
201@dataclasses.dataclass
202class AddressWriter:
203 """A helper object for writing address files for multi-block files."""
205 addresses: list[dict[uuid.UUID, Address]] = dataclasses.field(default_factory=list)
206 """Addresses to store with each UUID.
208 Every key in one of these dictionaries must have an entry in ``indices``.
209 The converse is not true.
210 """
212 def write(self, stream: IO[bytes], int_size: int, all_ids: Set[uuid.UUID] | None = None) -> None:
213 """Write all addresses to a file-like object.
215 Parameters
216 ----------
217 stream : `typing.IO` [ `bytes` ]
218 Binary file-like object.
219 int_size : `int`
220 Number of bytes to use for all integers.
221 all_ids : `~collections.abc.Set` [`uuid.UUID`], optional
222 Set of the union of all UUIDs in any dictionary from a call to
223 `get_all_ids`.
224 """
225 if all_ids is None:
226 all_ids = self.get_all_ids()
227 stream.write(int_size.to_bytes(1))
228 stream.write(len(all_ids).to_bytes(int_size))
229 stream.write(len(self.addresses).to_bytes(int_size))
230 empty_address = Address()
231 for n, key in enumerate(sorted(all_ids, key=attrgetter("int"))):
232 row = AddressRow(key, n, [m.get(key, empty_address) for m in self.addresses])
233 _LOG.debug("Wrote address %s.", row)
234 row.write(stream, int_size)
236 def write_to_zip(self, zf: zipfile.ZipFile, name: str, int_size: int) -> None:
237 """Write all addresses to a file in a zip archive.
239 Parameters
240 ----------
241 zf : `zipfile.ZipFile`
242 Zip archive to add the file to.
243 name : `str`
244 Base name for the address file; an extension will be added.
245 int_size : `int`
246 Number of bytes to use for all integers.
247 """
248 all_ids = self.get_all_ids()
249 zip_info = zipfile.ZipInfo(f"{name}.addr")
250 row_size = AddressReader.compute_row_size(int_size, len(self.addresses))
251 zip_info.file_size = AddressReader.compute_header_size(int_size) + len(all_ids) * row_size
252 with zf.open(zip_info, mode="w") as stream:
253 self.write(stream, int_size=int_size, all_ids=all_ids)
255 def get_all_ids(self) -> Set[uuid.UUID]:
256 """Return all IDs used by any address dictionary.
258 Returns
259 -------
260 all_ids : `~collections.abc.Set` [`uuid.UUID`]
261 Set of all IDs.
262 """
263 all_ids: set[uuid.UUID] = set()
264 for address_map in self.addresses:
265 all_ids.update(address_map.keys())
266 return all_ids
269@dataclasses.dataclass
270class AddressPage:
271 """A page of addresses in the `AddressReader`."""
273 file_offset: int
274 """Offset in bytes to this page from the beginning of the file."""
276 begin: int
277 """Index of the first row in this page."""
279 n_rows: int
280 """Number of rows in this page."""
282 read: bool = False
283 """Whether this page has already been read."""
285 @property
286 def end(self) -> int:
287 """One past the last row index in this page."""
288 return self.begin + self.n_rows
291@dataclasses.dataclass
292class PageBounds:
293 """A page index and the UUID interval that page covers."""
295 page_index: int
296 """Index into the page array."""
298 uuid_int_begin: UUID_int
299 """Integer representation of the smallest UUID in this page."""
301 uuid_int_end: UUID_int
302 """One larger than the integer representation of the largest UUID in this
303 page.
304 """
306 def __str__(self) -> str:
307 return f"{self.page_index} [{self.uuid_int_begin:x}:{self.uuid_int_end:x}]"
310@dataclasses.dataclass
311class AddressReader:
312 """A helper object for reading address files for multi-block files."""
314 stream: IO[bytes]
315 """Stream to read from."""
317 int_size: int
318 """Size of each integer in bytes."""
320 n_rows: int
321 """Number of rows in the file."""
323 n_addresses: int
324 """Number of addresses in each row."""
326 rows_per_page: int
327 """Number of addresses in each page."""
329 rows: dict[uuid.UUID, AddressRow] = dataclasses.field(default_factory=dict)
330 """Rows that have already been read."""
332 pages: list[AddressPage] = dataclasses.field(default_factory=list)
333 """Descriptions of the file offsets and integer row indexes of pages and
334 flags for whether they have been read already.
335 """
337 page_bounds: dict[int, PageBounds] = dataclasses.field(default_factory=dict)
338 """Mapping from page index to page boundary information."""
340 @classmethod
341 def from_stream(
342 cls, stream: IO[bytes], *, page_size: int, n_addresses: int, int_size: int
343 ) -> AddressReader:
344 """Construct from a stream by reading the header.
346 Parameters
347 ----------
348 stream : `typing.IO` [ `bytes` ]
349 File-like object to read from.
350 page_size : `int`
351 Approximate number of bytes to read at a time when searching for an
352 address.
353 n_addresses : `int`
354 Number of addresses to expect per row. This is checked against
355 the size embedded in the file.
356 int_size : `int`
357 Number of bytes to use for all integers. This is checked against
358 the size embedded in the file.
359 """
360 header_size = cls.compute_header_size(int_size)
361 row_size = cls.compute_row_size(int_size, n_addresses)
362 # Read the raw header page.
363 header_page_data = stream.read(header_size)
364 if len(header_page_data) < header_size:
365 raise InvalidQuantumGraphFileError("Address file unexpectedly truncated.")
366 # Interpret the raw header data and initialize the reader instance.
367 header_page_stream = BytesIO(header_page_data)
368 file_int_size = int.from_bytes(header_page_stream.read(1))
369 if file_int_size != int_size:
370 raise InvalidQuantumGraphFileError(
371 f"int size in address file ({file_int_size}) does not match int size in header ({int_size})."
372 )
373 n_rows = int.from_bytes(header_page_stream.read(int_size))
374 file_n_addresses = int.from_bytes(header_page_stream.read(int_size))
375 if file_n_addresses != n_addresses:
376 raise InvalidQuantumGraphFileError(
377 f"Incorrect number of addresses per row: expected {n_addresses}, got {file_n_addresses}."
378 )
379 rows_per_page = max(page_size // row_size, 1)
380 # Construct an instance.
381 self = cls(stream, int_size, n_rows, n_addresses, rows_per_page=rows_per_page)
382 # Calculate positions of each page of rows.
383 row_index = 0
384 file_offset = header_size
385 while row_index < n_rows:
386 self.pages.append(AddressPage(file_offset=file_offset, begin=row_index, n_rows=rows_per_page))
387 row_index += rows_per_page
388 file_offset += rows_per_page * row_size
389 if row_index != n_rows:
390 # Last page was too big.
391 self.pages[-1].n_rows -= row_index - n_rows
392 assert sum(p.n_rows for p in self.pages) == n_rows, "Bad logic setting page row counts."
393 return self
395 @classmethod
396 @contextmanager
397 def open_in_zip(
398 cls,
399 zf: zipfile.ZipFile,
400 name: str,
401 *,
402 page_size: int,
403 n_addresses: int,
404 int_size: int,
405 ) -> Iterator[AddressReader]:
406 """Make a reader for an address file in a zip archive.
408 Parameters
409 ----------
410 zf : `zipfile.ZipFile`
411 Zip archive to read the file from.
412 name : `str`
413 Base name for the address file; an extension will be added.
414 page_size : `int`
415 Approximate number of bytes to read at a time when searching for an
416 address.
417 n_addresses : `int`
418 Number of addresses to expect per row. This is checked against
419 the size embedded in the file.
420 int_size : `int`
421 Number of bytes to use for all integers. This is checked against
422 the size embedded in the file.
424 Returns
425 -------
426 reader : `contextlib.AbstractContextManager` [ `AddressReader` ]
427 Context manager that returns a reader when entered.
428 """
429 with zf.open(f"{name}.addr", mode="r") as stream:
430 yield cls.from_stream(stream, page_size=page_size, n_addresses=n_addresses, int_size=int_size)
432 @staticmethod
433 def compute_header_size(int_size: int) -> int:
434 """Return the size (in bytes) of the header of an address file.
436 Parameters
437 ----------
438 int_size : `int`
439 Size of each integer in bytes.
441 Returns
442 -------
443 size : `int`
444 Size of the header in bytes.
445 """
446 return (
447 1 # int_size
448 + int_size # number of rows
449 + int_size # number of addresses in each row
450 )
452 @staticmethod
453 def compute_row_size(int_size: int, n_addresses: int) -> int:
454 """Return the size (in bytes) of each row of an address file.
456 Parameters
457 ----------
458 int_size : `int`
459 Size of each integer in bytes.
460 n_addresses : `int`
461 Number of addresses in each row.
463 Returns
464 -------
465 size : `int`
466 Size of each row in bytes.
467 """
468 return (
469 16 # uuid
470 + int_size
471 * (
472 1 # index
473 + 2 * n_addresses
474 )
475 )
477 @property
478 def row_size(self) -> int:
479 """The size (in bytes) of each row of this address file."""
480 return self.compute_row_size(self.int_size, self.n_addresses)
482 def read_all(self) -> dict[uuid.UUID, AddressRow]:
483 """Read all addresses in the file.
485 Returns
486 -------
487 rows : `dict` [ `uuid.UUID`, `AddressRow` ]
488 Mapping of loaded address rows, keyed by UUID.
489 """
490 # Skip any pages from the beginning that have already been read; this
491 # nicely handles both the case where we already read everything (or
492 # there was nothing to read) while giving us a page with a file offset
493 # to start from.
494 for page in self.pages:
495 if not page.read:
496 break
497 else:
498 return self.rows
499 # Read the entire rest of the file into memory.
500 self.stream.seek(page.file_offset)
501 data = self.stream.read()
502 buffer = BytesIO(data)
503 # Shortcut out if we've already read everything, but don't bother
504 # optimizing previous partial reads.
505 while len(self.rows) < self.n_rows:
506 self._read_row(buffer)
507 # Delete all pages; they don't matter anymore, and that's easier than
508 # updating them to reflect the reads we've done.
509 self.pages.clear()
510 return self.rows
512 def find(self, key: uuid.UUID) -> AddressRow:
513 """Read the row for the given UUID or integer index.
515 Parameters
516 ----------
517 key : `uuid.UUID`
518 UUID to find.
520 Returns
521 -------
522 row : `AddressRow`
523 Addresses for the given UUID.
524 """
525 if (row := self.rows.get(key)) is not None:
526 return row
527 if self.n_rows == 0 or not self.pages:
528 raise LookupError(f"Address for {key} not found.")
530 # Use a binary search to find the page containing the target UUID.
531 left = 0
532 right = len(self.pages) - 1
533 while left <= right:
534 mid = left + ((right - left) // 2)
535 self._read_page(mid)
536 if (row := self.rows.get(key)) is not None:
537 return row
538 bounds = self.page_bounds[mid]
539 if key.int < bounds.uuid_int_begin:
540 right = mid - 1
541 elif key.int > bounds.uuid_int_end:
542 left = mid + 1
543 else:
544 # Should have been on this page, but it wasn't.
545 raise LookupError(f"Address for {key} not found.")
547 # Ran out of pages to search.
548 raise LookupError(f"Address for {key} not found.")
550 def _read_page(self, page_index: int, page_stream: BytesIO | None = None) -> bool:
551 page = self.pages[page_index]
552 if page.read:
553 return False
554 if page_stream is None:
555 self.stream.seek(page.file_offset)
556 page_stream = BytesIO(self.stream.read(page.n_rows * self.row_size))
557 row = self._read_row(page_stream)
558 uuid_int_begin = row.key.int
559 for _ in range(1, page.n_rows):
560 row = self._read_row(page_stream)
561 uuid_int_end = row.key.int + 1 # Python's loop scoping rules are actually useful here!
562 page.read = True
563 bounds = PageBounds(page_index=page_index, uuid_int_begin=uuid_int_begin, uuid_int_end=uuid_int_end)
564 self.page_bounds[page_index] = bounds
565 _LOG.debug("Read page %s with rows [%s:%s].", bounds, page.begin, page.end)
566 return True
568 def _read_row(self, page_stream: BytesIO) -> AddressRow:
569 row = AddressRow.read(page_stream, self.n_addresses, self.int_size)
570 self.rows[row.key] = row
571 _LOG.debug("Read address row %s.", row)
572 return row
575@dataclasses.dataclass
576class MultiblockWriter:
577 """A helper object for writing multi-block files."""
579 stream: IO[bytes]
580 """A binary file-like object to write to."""
582 int_size: int
583 """Number of bytes to use for all integers."""
585 file_size: int = 0
586 """Running size of the full file."""
588 addresses: dict[uuid.UUID, Address] = dataclasses.field(default_factory=dict)
589 """Running map of all addresses added to the file so far.
591 When the multi-block file is fully written, this is appended to the
592 `AddressWriter.addresses` to write the corresponding address file.
593 """
595 @classmethod
596 @contextmanager
597 def open_in_zip(
598 cls, zf: zipfile.ZipFile, name: str, int_size: int, use_tempfile: bool = False
599 ) -> Iterator[MultiblockWriter]:
600 """Open a writer for a file in a zip archive.
602 Parameters
603 ----------
604 zf : `zipfile.ZipFile`
605 Zip archive to add the file to.
606 name : `str`
607 Base name for the multi-block file; an extension will be added.
608 int_size : `int`
609 Number of bytes to use for all integers.
610 use_tempfile : `bool`, optional
611 If `True`, send writes to a temporary file and only add the file to
612 the zip archive when the context manager closes. This involves
613 more overall I/O, but it permits multiple multi-block files to be
614 open for writing in the same zip archive at once.
616 Returns
617 -------
618 writer : `contextlib.AbstractContextManager` [ `MultiblockWriter` ]
619 Context manager that returns a writer when entered.
620 """
621 filename = f"{name}.mb"
622 if use_tempfile:
623 with tempfile.NamedTemporaryFile(suffix=filename) as tmp:
624 yield MultiblockWriter(tmp, int_size)
625 tmp.flush()
626 zf.write(tmp.name, filename)
627 else:
628 with zf.open(f"{name}.mb", mode="w", force_zip64=True) as stream:
629 yield MultiblockWriter(stream, int_size)
631 def write_bytes(self, id: uuid.UUID, data: bytes) -> Address:
632 """Write raw bytes to the multi-block file.
634 Parameters
635 ----------
636 id : `uuid.UUID`
637 Unique ID of the object described by this block.
638 data : `bytes`
639 Data to store directly.
641 Returns
642 -------
643 address : `Address`
644 Address of the bytes just written.
645 """
646 assert id not in self.addresses, "Duplicate write to multi-block file detected."
647 self.stream.write(len(data).to_bytes(self.int_size))
648 self.stream.write(data)
649 block_size = len(data) + self.int_size
650 address = Address(offset=self.file_size, size=block_size)
651 self.file_size += block_size
652 self.addresses[id] = address
653 return address
655 def write_model(self, id: uuid.UUID, model: pydantic.BaseModel, compressor: Compressor) -> Address:
656 """Write raw bytes to the multi-block file.
658 Parameters
659 ----------
660 id : `uuid.UUID`
661 Unique ID of the object described by this block.
662 model : `pydantic.BaseModel`
663 Model to convert to JSON and compress.
664 compressor : `Compressor`
665 Object with a ``compress`` method that takes and returns `bytes`.
667 Returns
668 -------
669 address : `Address`
670 Address of the bytes just written.
671 """
672 json_data = model.model_dump_json().encode()
673 compressed_data = compressor.compress(json_data)
674 return self.write_bytes(id, compressed_data)
677@dataclasses.dataclass
678class MultiblockReader:
679 """A helper object for reader multi-block files."""
681 stream: IO[bytes]
682 """A binary file-like object to read from."""
684 int_size: int
685 """Number of bytes to use for all integers."""
687 @classmethod
688 @contextmanager
689 def open_in_zip(cls, zf: zipfile.ZipFile, name: str, *, int_size: int) -> Iterator[MultiblockReader]:
690 """Open a reader for a file in a zip archive.
692 Parameters
693 ----------
694 zf : `zipfile.ZipFile`
695 Zip archive to read the file from.
696 name : `str`
697 Base name for the multi-block file; an extension will be added.
698 int_size : `int`
699 Number of bytes to use for all integers.
701 Returns
702 -------
703 reader : `contextlib.AbstractContextManager` [ `MultiblockReader` ]
704 Context manager that returns a reader when entered.
705 """
706 with zf.open(f"{name}.mb", mode="r") as stream:
707 yield MultiblockReader(stream, int_size)
709 @classmethod
710 def read_all_bytes_in_zip(
711 cls, zf: zipfile.ZipFile, name: str, *, int_size: int, page_size: int
712 ) -> Iterator[bytes]:
713 """Iterate over all of the byte blocks in a file in a zip archive.
715 Parameters
716 ----------
717 zf : `zipfile.ZipFile`
718 Zip archive to read the file from.
719 name : `str`
720 Base name for the multi-block file; an extension will be added.
721 int_size : `int`
722 Number of bytes to use for all integers.
723 page_size : `int`
724 Approximate number of bytes to read at a time.
726 Returns
727 -------
728 byte_iter : `~collections.abc.Iterator` [ `bytes` ]
729 Iterator over blocks.
730 """
731 with zf.open(f"{name}.mb", mode="r") as zf_stream:
732 # The standard library typing of IO[bytes] tiers isn't consistent.
733 buffered_stream = BufferedReader(zf_stream) # type: ignore[type-var]
734 size_data = buffered_stream.read(int_size)
735 while size_data:
736 internal_size = int.from_bytes(size_data)
737 yield buffered_stream.read(internal_size)
738 size_data = buffered_stream.read(int_size)
740 @classmethod
741 def read_all_models_in_zip(
742 cls,
743 zf: zipfile.ZipFile,
744 name: str,
745 model_type: type[_T],
746 decompressor: Decompressor,
747 *,
748 int_size: int,
749 page_size: int,
750 ) -> Iterator[_T]:
751 """Iterate over all of the models in a file in a zip archive.
753 Parameters
754 ----------
755 zf : `zipfile.ZipFile`
756 Zip archive to read the file from.
757 name : `str`
758 Base name for the multi-block file; an extension will be added.
759 model_type : `type` [ `pydantic.BaseModel` ]
760 Pydantic model to validate JSON with.
761 decompressor : `Decompressor`
762 Object with a ``decompress`` method that takes and returns `bytes`.
763 int_size : `int`
764 Number of bytes to use for all integers.
765 page_size : `int`
766 Approximate number of bytes to read at a time.
768 Returns
769 -------
770 model_iter : `~collections.abc.Iterator` [ `pydantic.BaseModel` ]
771 Iterator over model instances.
772 """
773 for compressed_data in cls.read_all_bytes_in_zip(zf, name, int_size=int_size, page_size=page_size):
774 json_data = decompressor.decompress(compressed_data)
775 yield model_type.model_validate_json(json_data)
777 def read_bytes(self, address: Address) -> bytes | None:
778 """Read raw bytes from the multi-block file.
780 Parameters
781 ----------
782 address : `Address`
783 Offset and size of the data to read.
785 Returns
786 -------
787 data : `bytes` or `None`
788 Data read directly, or `None` if the address has zero size.
789 """
790 if not address.size:
791 return None
792 self.stream.seek(address.offset)
793 data = self.stream.read(address.size)
794 internal_size = int.from_bytes(data[: self.int_size])
795 data = data[self.int_size :]
796 if len(data) != internal_size:
797 raise InvalidQuantumGraphFileError(
798 f"Internal size {internal_size} does not match loaded data size {len(data)}."
799 )
800 return data
802 def read_model(self, address: Address, model_type: type[_T], decompressor: Decompressor) -> _T | None:
803 """Read a single compressed JSON block.
805 Parameters
806 ----------
807 address : `Address`
808 Size and offset of the block.
809 model_type : `type` [ `pydantic.BaseModel` ]
810 Pydantic model to validate JSON with.
811 decompressor : `Decompressor`
812 Object with a ``decompress`` method that takes and returns `bytes`.
814 Returns
815 -------
816 model : `pydantic.BaseModel`
817 Validated model.
818 """
819 compressed_data = self.read_bytes(address)
820 if compressed_data is None:
821 return None
822 json_data = decompressor.decompress(compressed_data)
823 return model_type.model_validate_json(json_data)