From fae932e12e000bb916d7df5a89c5605854d39d41 Mon Sep 17 00:00:00 2001 From: Eta Date: Thu, 18 Jul 2024 02:33:38 -0400 Subject: [PATCH] feat(serialization): Store relative offsets for tensor data and headers Instead of storing the absolute file position of header entries and tensor data, this change stores their offsets relative to the beginning of their respective segment (e.g. the header segment, or the data segment). This makes it easier to create and edit headers without needing to already know where all prior segments end. Information about absolute offsets are instead stored on a segment-by-segment basis in a "layout" section of the metadata, preceding the metadata and header sections. --- tensorizer/serialization.py | 384 +++++++++++++++++++++++++----------- tests/test_serialization.py | 23 ++- 2 files changed, 286 insertions(+), 121 deletions(-) diff --git a/tensorizer/serialization.py b/tensorizer/serialization.py index c534a163..6c4eb10e 100644 --- a/tensorizer/serialization.py +++ b/tensorizer/serialization.py @@ -321,6 +321,114 @@ def from_io( return cls(version_number, feature_flags, tensor_size, tensor_count) +@dataclasses.dataclass +class _FileLayout: + _VERSION_TAG: ClassVar[int] = 1 + _NUM_ENTRIES: ClassVar[int] = 3 + metadata_start: Optional[int] = None + metadata_end: Optional[int] = None + _METADATA_TAG: ClassVar[int] = 1 + header_start: Optional[int] = None + header_end: Optional[int] = None + _HEADER_TAG: ClassVar[int] = 2 + data_start: Optional[int] = None + _DATA_TAG: ClassVar[int] = 3 + + _FORMAT: ClassVar[struct.Struct] = struct.Struct( + "<" + "B" # Layout version tag + "B" # Number of entries + "B" # Metadata segment tag + "Q" # Metadata segment start + "Q" # Metadata segment end + "B" # Header segment tag + "Q" # Header segment start + "Q" # Header segment end + "B" # Tensor data segment tag + "Q" # Tensor data segment start + ) + + _LENGTH_FORMAT: ClassVar[struct.Struct] = struct.Struct(" bytearray: + buffer: bytearray = bytearray(cls._FORMAT_SIZE_WITH_LENGTH) + cls._LENGTH_FORMAT.pack_into(buffer, 0, cls._FORMAT_SIZE) + return buffer + + def pack(self) -> bytearray: + if None in ( + self.metadata_start, + self.metadata_end, + self.header_start, + self.header_end, + self.data_start, + ): + raise RuntimeError("Cannot build; incomplete data") + + buffer: bytearray = self.placeholder() + + self._FORMAT.pack_into( + buffer, + self._LENGTH_SIZE, + self._VERSION_TAG, + self._NUM_ENTRIES, + self._METADATA_TAG, + self.metadata_start, + self.metadata_end, + self._HEADER_TAG, + self.header_start, + self.header_end, + self._DATA_TAG, + self.data_start, + ) + + return buffer + + @classmethod + def parse(cls, buffer: Union[bytearray, bytes]) -> "_FileLayout": + ( + version_tag, + num_entries, + metadata_tag, + metadata_start, + metadata_end, + header_tag, + header_start, + header_end, + data_tag, + data_start, + ) = cls._FORMAT.unpack(buffer) + + if version_tag != cls._VERSION_TAG: + raise ValueError("Unknown layout version") + elif num_entries != cls._NUM_ENTRIES: + raise ValueError("Incorrect number of layout entries") + elif metadata_tag != cls._METADATA_TAG: + raise ValueError("Incorrect metadata segment tag in layout") + elif header_tag != cls._HEADER_TAG: + raise ValueError("Incorrect header segment tag in layout") + elif data_tag != cls._DATA_TAG: + raise ValueError("Incorrect data segment tag in layout") + + parsed: cls = cls() + parsed.metadata_start = metadata_start + parsed.metadata_end = metadata_end + parsed.header_start = header_start + parsed.header_end = header_end + parsed.data_start = data_start + return parsed + + @classmethod + def from_io(cls, reader) -> "_FileLayout": + size: int = cls._LENGTH_FORMAT.unpack(reader.read(cls._LENGTH_SIZE))[0] + buffer: bytes = reader.read(size) + return cls.parse(buffer) + + @dataclasses.dataclass(init=False) class _TensorHeaderSerializer: # Fields with ClassVar are shared across all instances, @@ -412,7 +520,8 @@ def __init__( dtype: bytes, shape: Sequence[int], data_length: int, - file_offset: int, # location of header in file + header_offset: int, # location of header in the header segment + data_offset: int, # location of tensor data in the data segment include_crc32: bool = True, include_sha256: bool = True, crypt_info: Optional[_crypt_info.CryptInfo] = None, @@ -423,7 +532,8 @@ def __init__( self.shape = shape self.dtype = dtype self.data_length = data_length - self.file_offset = file_offset + self.header_offset = header_offset + self.data_offset = data_offset self.include_crc32 = include_crc32 self.include_sha256 = include_sha256 @@ -445,6 +555,13 @@ def __init__( shape_len=self.shape_len, ) ) + self.metadata_entry_segment = struct.Struct( + self.metadata_entry_segment_template.format( + name_len=self.name_len, + dtype_len=self.dtype_len, + shape_len=self.shape_len, + ) + ) crc32_len = sha256_len = self.hash_count = 0 self.has_crc32 = include_crc32 self.has_sha256 = include_sha256 @@ -476,10 +593,6 @@ def __init__( ) ) - def build(self, tensor_data_offset: int): - # tensor_data_offset: location of tensor data in file - self.data_offset = tensor_data_offset - self.buffer = bytearray(self.size) self.start_segment.pack_into( self.buffer, @@ -520,9 +633,7 @@ def build(self, tensor_data_offset: int): self.buffer, self.data_length_offset, self.data_length ) - metadata_entry_segment = self.get_metadata_entry_segment() - - self.metadata_entry = metadata_entry_segment.pack( + self.metadata_entry = self.metadata_entry_segment.pack( self.name_len, # Name length self.name, # Name self.tensor_type.value, # Whether this is a parameter or a buffer @@ -530,21 +641,12 @@ def build(self, tensor_data_offset: int): self.dtype, # Dtype self.shape_len, # Shape length *self.shape, # Shape - self.file_offset, # Header start (relative to the file) - # Tensor data start (relative to the file): + self.header_offset, # Header start (relative to the header segment) + # Tensor data start (relative to the tensor data segment): self.data_offset, self.data_length, # Tensor length ) - def get_metadata_entry_segment(self) -> struct.Struct: - return struct.Struct( - self.metadata_entry_segment_template.format( - name_len=self.name_len, - dtype_len=self.dtype_len, - shape_len=self.shape_len, - ) - ) - def _hashable_segment_views(self): # Skip areas where we store hashes and crypt_info yield memoryview(self.buffer)[: self.hash_header_offset] @@ -1705,6 +1807,11 @@ def __init__( "Tensor is encrypted, but decryption was not requested" ) + # Read the file layout, if present + self._layout: Optional[_FileLayout] = None + if version_number >= HEADERS_AT_TOP_TENSORIZER_VERSION: + self._layout = _FileLayout.from_io(self._file) + # Read the metadata index of tensors. # This is a list of offsets into the file where the per-tensor data # is stored. @@ -1714,6 +1821,11 @@ def __init__( self._file, self._file_header.tensor_count ) ) + if self._layout is not None: + for entry in self._metadata.values(): + entry: TensorEntry + entry.data_offset += self._layout.data_start + entry.offset += self._layout.header_start if not self._metadata: raise ValueError("Tensor index in the file is empty") @@ -3502,26 +3614,87 @@ def __init__( ) self._write(self._file_header.to_bytes()) - self._metadata_start = self._file.tell() - self._metadata_cur = ( - self._metadata_start + 8 - ) # position of next metadata entry we'd write. Leave 8 bytes for metadata length field + self._layout_pos: int = self._file.tell() + self._layout_size: int = self._write(_FileLayout.placeholder()) + self._last_layout: Optional[_FileLayout] = None + + self._flush() # Flush potentially-buffered writes from `_write` calls + + self._metadata_start: int = self._layout_pos + self._layout_size + + # Offset of next metadata entry we'd write, relative to + # the metadata segment (i.e., bytes after self._metadata_start). + # Leave 8 bytes for metadata length field + self._metadata_offset: int = 8 + + # Next header write offset, relative to the header segment + # (i.e., bytes after self._metadata_end). + self._header_offset: int = 0 + + # Next tensor write position relative to the start of the data segment + # (i.e., bytes after self._header_end). + self._tensor_offset: int = 0 + self._metadata_end: Optional[int] = None self._header_end: Optional[int] = None if max_tensors: - # Estimate 256 bytes per metadata entry and 1024 bytes per header entry - self._metadata_end = self._metadata_cur + max_tensors * 256 - # this is less about header_end itself but ensuring that tensor_start is on a 4096-byte aligned boundary - self._header_end = ( - (self._metadata_end + max_tensors * 1024) + 4095 - ) & ~4095 - - self._header_cur = ( - self._metadata_end - ) # is the start of where to write header data, or None - self._tensor_cur = ( - self._header_end - ) # is the start of where to write tensor data. or None + approx_metadata_size: int = 256 * max_tensors + approx_header_size: int = 1024 * max_tensors + + self._metadata_end = self._metadata_start + 8 + approx_metadata_size + # Extend the metadata segment to end on a block boundary + # This allows later manipulation with the fallocate syscall + # on coöperating operating systems to extend this segment + # in the middle of the file + self._metadata_end -= self._metadata_end % -4096 + + self._header_end = self._metadata_end + approx_header_size + # Extend the header segment to end on a block boundary + # This has the same benefits as above, plus positioning + # the following tensor data segment to begin on a block boundary, + # which can allow for more efficient read operations. + self._header_end -= self._header_end % -4096 + + # The layout is static at this point, so it can be written now. + self._update_layout() + + def _tensor_offset_to_file_pos(self, tensor_offset: int) -> int: + if self._header_end is None: + raise RuntimeError( + "The size of the file header isn't known yet," + "so the file position for tensors cannot be determined" + ) + return self._header_end + tensor_offset + + @property + def _tensor_segment_end(self) -> int: + return self._tensor_offset_to_file_pos(self._tensor_offset) + + def _update_layout(self) -> int: + # In case this is called with any of these still unset, + # (e.g., from closing the serializer without writing anything), + # mark those segments as empty + metadata_start = self._metadata_start + metadata_end = self._metadata_end or metadata_start + header_end = self._header_end or metadata_end + + layout: _FileLayout = _FileLayout( + metadata_start=metadata_start, + metadata_end=metadata_end, + header_start=metadata_end, + header_end=header_end, + data_start=header_end, + ) + + if layout != self._last_layout: + size = self._pwrite( + layout.pack(), + self._layout_pos, + verify=self._layout_size, + ) + self._last_layout = layout + return size + return 0 @property def total_tensor_bytes(self): @@ -3546,14 +3719,16 @@ def _sync_prologue_state(self): """ # Write our zero-length field, that indicates that this is the last # tensor. This will be overwritten if another tensor is written. - self._pwrite(struct.pack(" None: - # We first need to construct the headers so that we know the size of each for w in write_specs: - dtype_bytes = w.dtype.encode("utf-8") # type: ignore + dtype_bytes: bytes = w.dtype.encode("utf-8") if len(dtype_bytes) >= 256: raise ValueError("dtype name length should be less than 256") + # Each individual tensor begins on an 8-byte aligned boundary + self._tensor_offset -= self._tensor_offset % -8 + w.header = _TensorHeaderSerializer( w.module_index, w.tensor_type, @@ -4386,64 +4563,41 @@ def _prepare_for_write_headers( dtype_bytes, w.shape, w.data_length, - 0, # placeholder file_offset + self._header_offset, + self._tensor_offset, include_crc32=w.include_crc32, include_sha256=w.include_sha256, crypt_info=w.crypt_info, ) - # Specify the offsets for each metadata entry - file_offset = ( - self._metadata_cur - ) # position of next metadata entry to write + # Record the offset of this metadata entry + w.metadata_offset = self._metadata_offset - ## metadata - for w in write_specs: - w.metadata_pos = file_offset - file_offset += w.header.get_metadata_entry_segment().size + self._metadata_offset += w.header.metadata_entry_segment.size + self._header_offset += w.header.size + self._tensor_offset += w.data_length - self._metadata_cur = file_offset + # Set or validate self._metadata_end + file_offset: int = self._metadata_start + self._metadata_offset if self._metadata_end is None: - self._metadata_end = self._metadata_cur + # Extend to the end of the current block + file_offset -= file_offset % -4096 + self._metadata_end = file_offset + # Set the end of the metadata to the next block boundary elif file_offset > self._metadata_end: raise RuntimeError("Metadata block is full. Increase max_tensors") - ## headers - if self._header_cur is not None: - if self._header_cur < file_offset: - raise RuntimeError("Somehow wrote past metadata block") - file_offset = self._header_cur - - for w in write_specs: - w.header.file_offset = file_offset - file_offset += w.header.size - - self._header_cur = file_offset + # Set or validate self._header_end + file_offset = self._metadata_end + self._header_offset if self._header_end is None: - self._header_end = self._header_cur - elif self._header_cur > self._header_end: + # Extend to the end of the current block + file_offset -= file_offset % -4096 + self._header_end = file_offset + elif file_offset > self._header_end: raise RuntimeError("Header block is full. Increase max_tensors") - ## tensors - if self._tensor_cur is None: - # The block of tensor data starts on a page-aligned boundary - self._tensor_cur = (file_offset + 4095) & ~4095 - else: - if self._tensor_cur < file_offset: - raise RuntimeError("Somehow wrote past header block") - # Each tensor itself begins on an 8-byte aligned boundary - file_offset = (self._tensor_cur + 7) & ~7 - - # file_offset is now where we should start writing tensor data - for w in write_specs: - w.header.build(file_offset) # type: ignore - file_offset += w.data_length - - self._tensor_cur = file_offset - - def _prepare_for_write_meta( - self, write_specs: Sequence[_WriteSpec] - ) -> None: + @staticmethod + def _prepare_for_write_meta(write_specs: Sequence[_WriteSpec]) -> None: for w in write_specs: if not w.tensor.is_meta: continue @@ -4539,50 +4693,52 @@ def do_commit( dependencies: Sequence[_Future], ): # Fast version: makes one buffer containing the size, metadata, and headers, and writes it one go - header_block_size = self._header_cur - self._metadata_start - header_buffer = bytearray(header_block_size) + metadata_start: int = self._metadata_start + metadata_segment_size: int = self._metadata_end - metadata_start + header_block_size: int = metadata_segment_size + self._header_offset + header_buffer: bytearray = bytearray(header_block_size) - metadata_start = self._metadata_start - metadata_size = ( - self._metadata_cur - metadata_start - 8 - ) # 8 bytes for metadata length field - struct.pack_into(" 4096 bytes total from repeating it 25 times + serializer.write_tensor( + i, + f"{long_name_stem}_{i:02d}", + TensorType.PARAM, + torch.zeros((4, 4), dtype=torch.uint8), + ) + serializer.close() def test_too_many(self): # If you set max_tensors too low you'll eventually run out of header space