Skip to content

fix: loading of datasets from Disk(#7373) #7489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/datasets/arrow_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5191,8 +5191,13 @@ def _generate_tables_from_shards(shards: list["Dataset"], batch_size: int):

@staticmethod
def _generate_tables_from_cache_file(filename: str):
for batch_idx, batch in enumerate(_memory_mapped_record_batch_reader_from_file(filename)):
yield batch_idx, pa.Table.from_batches([batch])
reader, mmap_stream = _memory_mapped_record_batch_reader_from_file(filename)
try:
for batch_idx, batch in enumerate(reader):
yield batch_idx, pa.Table.from_batches([batch])
finally:
reader.close()
mmap_stream.close()

def to_iterable_dataset(self, num_shards: Optional[int] = 1) -> "IterableDataset":
"""Get an [`datasets.IterableDataset`] from a map-style [`datasets.Dataset`].
Expand Down
33 changes: 30 additions & 3 deletions src/datasets/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,44 @@ def _in_memory_arrow_table_from_file(filename: str) -> pa.Table:
in_memory_stream = pa.input_stream(filename)
opened_stream = pa.ipc.open_stream(in_memory_stream)
pa_table = opened_stream.read_all()
opened_stream.close()
in_memory_stream.close()
return pa_table


def _in_memory_arrow_table_from_buffer(buffer: pa.Buffer) -> pa.Table:
stream = pa.BufferReader(buffer)
opened_stream = pa.ipc.open_stream(stream)
table = opened_stream.read_all()
opened_stream.close()
stream.close()
return table


def _memory_mapped_record_batch_reader_from_file(filename: str) -> pa.RecordBatchStreamReader:
def _memory_mapped_record_batch_reader_from_file(
filename: str,
) -> tuple[pa.RecordBatchStreamReader, pa.MemoryMappedFile]:
"""
Creates a memory-mapped record batch reader from a file.

This function opens a file as a memory-mapped stream and initializes
a RecordBatchStreamReader for reading Arrow record batches from the stream.

Note: Both the returned RecordBatchStreamReader and MemoryMappedFile
must be explicitly closed after use to release resources.

Args:
filename (str): The path to the file to be memory-mapped.

Returns:
tuple[pa.RecordBatchStreamReader, pa.MemoryMappedFile]:
A tuple containing:
- A RecordBatchStreamReader for reading Arrow record batches.
- A MemoryMappedFile object representing the memory-mapped file.

"""
memory_mapped_stream = pa.memory_map(filename)
return pa.ipc.open_stream(memory_mapped_stream)
return pa.ipc.open_stream(memory_mapped_stream), memory_mapped_stream


def read_schema_from_file(filename: str) -> pa.Schema:
Expand All @@ -61,8 +86,10 @@ def read_schema_from_file(filename: str) -> pa.Schema:


def _memory_mapped_arrow_table_from_file(filename: str) -> pa.Table:
opened_stream = _memory_mapped_record_batch_reader_from_file(filename)
opened_stream, memory_mapped_stream = _memory_mapped_record_batch_reader_from_file(filename)
pa_table = opened_stream.read_all()
opened_stream.close()
memory_mapped_stream.close()
return pa_table
Comment on lines +92 to 93
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really ok to close the memory map, given the memory mapped table is still in use ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, pa_table includes all information and is a copy: see the example of read_all(). https://arrow.apache.org/docs/python/ipc.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think read_all() doesn't load the data in memory here, it loads buffers that are memory mapped from disk

Copy link

@lasuomela lasuomela Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm that this works:

import pyarrow as pa

BATCH_SIZE = 10000
NUM_BATCHES = 1000

schema = pa.schema([pa.field('nums', pa.int32())])

# Write in stream format
with pa.OSFile('bigfile.arrow', 'wb') as sink:
    with pa.ipc.new_stream(sink, schema) as writer:
        for _ in range(NUM_BATCHES):
            batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
            writer.write(batch)

# Read the stream back
with pa.memory_map('bigfile.arrow', 'rb') as source:
    with pa.ipc.open_stream(source) as reader:
        table = reader.read_all()

print("LEN:", table.num_rows)
print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))

# Read the first batch
print("")
print("First batch:")
print(table[0][0:BATCH_SIZE])

Out:

LEN: 10000000
RSS: 0MB

First batch:
[
  [
    0,
    1,
    2,
    3,
    4,
    ...
    9995,
    9996,
    9997,
    9998,
    9999
  ]
]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lasuomela Thanks a lot for checking! I’m currently on vacation and without a laptop to verify it myself.
@lhoestq Would this be sufficient proof for you?



Expand Down
Loading