-
Notifications
You must be signed in to change notification settings - Fork 1.7k
[ENH]: Parallelize fetching sparse indexes of a segment #5146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
Parallelize Segment Sparse Index Fetching This PR refactors the record and metadata segment index loading logic to fetch all sparse segment blockfiles in parallel using tokio::join, significantly improving segment load performance. To achieve this, new helper functions for asynchronous index loading were introduced, common loading logic was extracted, and BlockfileProvider's trait bounds were clarified using new ReadKey/ReadValue traits for improved type safety across async boundaries. Key Changes• Refactored RecordSegmentReader and MetadataSegmentReader::from_segment to load all blockfile readers in parallel (tokio::join). Affected Areas• rust/segment/src/blockfile_record.rs This summary was automatically generated by @propel-code-bot |
71a61ef
to
0ea1680
Compare
@@ -42,6 +42,29 @@ impl Debug for BlockfileProvider { | |||
} | |||
} | |||
} | |||
pub trait ReadKey<'a>: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ty
0ea1680
to
0c31c60
Compare
2 Jobs Failed: PR checks / Rust tests / Integration test ci_k8s_integration_slow 1
PR checks / all-required-pr-checks-passed
Summary: 1 successful workflow, 1 failed workflow
Last updated: 2025-07-28 18:06:41 UTC |
bool_metadata_reader, | ||
f32_metadata_reader, | ||
u32_metadata_reader, | ||
) = tokio::join!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Consider adding error handling for the case where one of the parallel operations fails but others succeed. The current implementation fails fast, but you might want to log which specific index failed to load for better debugging.
) = tokio::join!( | |
let ( | |
pls_reader, | |
string_metadata_reader, | |
bool_metadata_reader, | |
f32_metadata_reader, | |
u32_metadata_reader, | |
) = tokio::join!( | |
pls_future, | |
string_metadata_future, | |
bool_metadata_future, | |
f32_metadata_future, | |
u32_metadata_future | |
); | |
// Handle results and create index readers with better error context | |
let pls_reader = pls_reader.map_err(|e| { | |
tracing::error!("Failed to load PLS reader: {:?}", e); | |
e | |
})?; |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
0c31c60
to
dd79c63
Compare
let (pl_result, vm_result) = tokio::join!( | ||
Self::posting_list_reader_from_id(pl_id, blockfile_provider, prefix_path), | ||
Self::versions_map_reader_from_id( | ||
versions_id, | ||
blockfile_provider, | ||
prefix_path | ||
) | ||
); | ||
(pl_result?, vm_result?) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Consider using futures::try_join!
here. It can make the code more concise and has the benefit of failing fast: if one of the futures fails, the others are dropped, which can save resources. tokio::join!
runs all futures to completion regardless of failures.
You'll need to add use futures::try_join;
at the top of the file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah +1, should we use try_join so we can early bail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great point! Using futures::try_join!
is indeed a best practice here. It allows for early exit if any of the parallel operations fail, which can save resources and provide clearer error handling by immediately propagating the first error. tokio::join!
would continue to run all futures even if one has already failed.
I've prepared suggestions for the changes in rust/index/src/spann/types.rs
, rust/segment/src/blockfile_metadata.rs
, and rust/segment/src/blockfile_record.rs
to incorporate try_join!
. You'll also need to add use futures::try_join;
to the top of each of these files.
let (pl_result, vm_result) = tokio::join!( | |
Self::posting_list_reader_from_id(pl_id, blockfile_provider, prefix_path), | |
Self::versions_map_reader_from_id( | |
versions_id, | |
blockfile_provider, | |
prefix_path | |
) | |
); | |
(pl_result?, vm_result?) | |
let (pl_result, vm_result) = futures::try_join!( | |
Self::posting_list_reader_from_id(pl_id, blockfile_provider, prefix_path), | |
Self::versions_map_reader_from_id( | |
versions_id, | |
blockfile_provider, | |
prefix_path | |
) | |
)?; | |
(pl_result, vm_result) |
let (pl_result, vm_result) = tokio::join!( | |
Self::posting_list_reader_from_id(pl_id, blockfile_provider, prefix_path), | |
Self::versions_map_reader_from_id( | |
versions_id, | |
blockfile_provider, | |
prefix_path | |
) | |
); | |
(pl_result?, vm_result?) | |
let ( | |
pls_reader, | |
string_metadata_reader, | |
bool_metadata_reader, | |
f32_metadata_reader, | |
u32_metadata_reader, | |
) = futures::try_join!( | |
pls_future, | |
string_metadata_future, | |
bool_metadata_future, | |
f32_metadata_future, | |
u32_metadata_future | |
)?; |
let (pl_result, vm_result) = tokio::join!( | |
Self::posting_list_reader_from_id(pl_id, blockfile_provider, prefix_path), | |
Self::versions_map_reader_from_id( | |
versions_id, | |
blockfile_provider, | |
prefix_path | |
) | |
); | |
(pl_result?, vm_result?) | |
let ( | |
max_offset_id_result, | |
user_id_to_id_result, | |
id_to_user_id_result, | |
id_to_data_result, | |
) = futures::try_join!( | |
max_offset_id_future, | |
user_id_to_id_future, | |
id_to_user_id_future, | |
id_to_data_future | |
)?; |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
let ( | ||
max_offset_id_result, | ||
user_id_to_id_result, | ||
id_to_user_id_result, | ||
id_to_data_result, | ||
) = tokio::join!( | ||
max_offset_id_future, | ||
user_id_to_id_future, | ||
id_to_user_id_future, | ||
id_to_data_future | ||
); | ||
|
||
if id_to_user_id_segment_path != max_offset_segment_path { | ||
let max_offset_id_bf_reader = max_offset_id_result?; | ||
let user_id_to_id = user_id_to_id_result?; | ||
let id_to_user_id = id_to_user_id_result?; | ||
let id_to_data = id_to_data_result?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Consider using futures::try_join!
here. It can make the code more concise and has the benefit of failing fast: if one of the futures fails, the others are dropped, which can save resources. tokio::join!
runs all futures to completion regardless of failures.
You'll need to add use futures::try_join;
at the top of the file.
Description of changes
Test plan
TBD
pytest
for python,yarn test
for js,cargo test
for rustMigration plan
Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?
Observability plan
What is the plan to instrument and monitor this change?
Documentation Changes
Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the docs section?