Skip to content

chore(cubestore): ClusterSend chunking by partition count #9819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rust/cubestore/cubestore/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ pub struct Config {
pub trait ConfigObj: DIService {
fn partition_split_threshold(&self) -> u64;

fn cluster_send_split_threshold(&self) -> u64;

fn partition_size_split_threshold_bytes(&self) -> u64;

fn max_partition_split_threshold(&self) -> u64;
Expand Down Expand Up @@ -555,6 +557,7 @@ pub trait ConfigObj: DIService {
#[derive(Debug, Clone)]
pub struct ConfigObjImpl {
pub partition_split_threshold: u64,
pub cluster_send_split_threshold: u64,
pub partition_size_split_threshold_bytes: u64,
pub max_partition_split_threshold: u64,
pub compaction_chunks_total_size_threshold: u64,
Expand Down Expand Up @@ -662,6 +665,10 @@ impl ConfigObj for ConfigObjImpl {
self.partition_split_threshold
}

fn cluster_send_split_threshold(&self) -> u64 {
self.cluster_send_split_threshold
}

fn partition_size_split_threshold_bytes(&self) -> u64 {
self.partition_size_split_threshold_bytes
}
Expand Down Expand Up @@ -1242,6 +1249,10 @@ impl Config {
"CUBESTORE_PARTITION_SPLIT_THRESHOLD",
1048576 * 2,
),
cluster_send_split_threshold: env_parse(
"CUBESTORE_CLUSTER_SEND_SPLIT_THRESHOLD",
4,
),
partition_size_split_threshold_bytes: env_parse_size(
"CUBESTORE_PARTITION_SIZE_SPLIT_THRESHOLD",
100 * 1024 * 1024,
Expand Down Expand Up @@ -1597,6 +1608,7 @@ impl Config {
.join(format!("{}-local-store", name)),
dump_dir: None,
partition_split_threshold: 20,
cluster_send_split_threshold: 4,
partition_size_split_threshold_bytes: 2 * 1024,
max_partition_split_threshold: 20,
compaction_chunks_count_threshold: 1,
Expand Down
68 changes: 63 additions & 5 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,7 @@ impl ClusterSendExec {
ps
}

fn issue_filters(ps: &[IdRow<Partition>]) -> Vec<(u64, RowRange)> {
fn issue_filters(ps: &[IdRow<Partition>]) -> Vec<(IdRow<Partition>, RowRange)> {
if ps.is_empty() {
return Vec::new();
}
Expand All @@ -1114,7 +1114,7 @@ impl ClusterSendExec {
if multi_id.is_none() {
return ps
.iter()
.map(|p| (p.get_id(), RowRange::default()))
.map(|p| (p.clone(), RowRange::default()))
.collect();
}
let filter = RowRange {
Expand All @@ -1129,7 +1129,7 @@ impl ClusterSendExec {
} else {
filter.clone()
};
r.push((p.get_id(), pf))
r.push((p.clone(), pf))
}
r
}
Expand All @@ -1138,7 +1138,8 @@ impl ClusterSendExec {
c: &dyn ConfigObj,
logical: Vec<Vec<InlineCompoundPartition>>,
) -> Vec<(String, (Vec<(u64, RowRange)>, Vec<InlineTableId>))> {
let mut m: HashMap<_, (Vec<(u64, RowRange)>, Vec<InlineTableId>)> = HashMap::new();
let mut m: HashMap<_, (Vec<(IdRow<Partition>, RowRange)>, Vec<InlineTableId>)> =
HashMap::new();
for ps in &logical {
let inline_table_ids = ps
.iter()
Expand Down Expand Up @@ -1178,7 +1179,64 @@ impl ClusterSendExec {

let mut r = m.into_iter().collect_vec();
r.sort_unstable_by(|l, r| l.0.cmp(&r.0));
r
r.into_iter()
.map(|(worker, data)| {
let splitted = Self::split_worker_parititons(c, data);
splitted.into_iter().map(move |data| (worker.clone(), data))
})
.flatten()
.collect_vec()
}

fn split_worker_parititons(
c: &dyn ConfigObj,
partitions: (Vec<(IdRow<Partition>, RowRange)>, Vec<InlineTableId>),
) -> Vec<(Vec<(u64, RowRange)>, Vec<InlineTableId>)> {
if !partitions.1.is_empty()
|| partitions
.0
.iter()
.any(|(p, _)| p.get_row().multi_partition_id().is_some())
{
return vec![(
partitions
.0
.into_iter()
.map(|(p, range)| (p.id, range))
.collect_vec(),
partitions.1,
)];
}
let rows_split_threshold = c.partition_split_threshold() * c.cluster_send_split_threshold();
let file_size_split_threshold =
c.partition_size_split_threshold_bytes() * c.cluster_send_split_threshold();
let mut result = vec![];
let mut current_rows = 0;
let mut current_files_size = 0;
let mut current_chunk = vec![];
let (partitions, _) = partitions;
for (partition, range) in partitions {
let rows = partition.get_row().main_table_row_count();
let file_size = partition.get_row().file_size().unwrap_or_default();
if current_rows + rows > rows_split_threshold
|| current_files_size + file_size > file_size_split_threshold
{
if !current_chunk.is_empty() {
result.push((std::mem::take(&mut current_chunk), vec![]));
current_rows = 0;
current_files_size = 0;
}
}

current_rows += rows;
current_files_size += file_size;
current_chunk.push((partition.id, range));
}
if !current_chunk.is_empty() {
result.push((current_chunk, vec![]));
}

result
}

pub fn with_changed_schema(
Expand Down
Loading