Skip to content

Delete unowned documents during split #130240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ public static IndexReshardingMetadata newSplitByMultiple(int shardCount, int mul
return new IndexReshardingMetadata(IndexReshardingState.Split.newSplitByMultiple(shardCount, multiple));
}

public static boolean isSplitSource(ShardId shardId, @Nullable IndexReshardingMetadata reshardingMetadata) {
return reshardingMetadata != null && reshardingMetadata.isSplit() && reshardingMetadata.getSplit().isSourceShard(shardId.id());
}

public static boolean isSplitTarget(ShardId shardId, @Nullable IndexReshardingMetadata reshardingMetadata) {
return reshardingMetadata != null && reshardingMetadata.isSplit() && reshardingMetadata.getSplit().isTargetShard(shardId.id());
}
Expand All @@ -221,6 +225,16 @@ public IndexReshardingMetadata transitionSplitTargetToNewState(
return new IndexReshardingMetadata(builder.build());
}

public IndexReshardingMetadata transitionSplitSourceToNewState(
ShardId shardId,
IndexReshardingState.Split.SourceShardState newSourceState
) {
assert state instanceof IndexReshardingState.Split;
IndexReshardingState.Split.Builder builder = new IndexReshardingState.Split.Builder((IndexReshardingState.Split) state);
builder.setSourceShardState(shardId.getId(), newSourceState);
return new IndexReshardingMetadata(builder.build());
}

/**
* @return the split state of this metadata block, or throw IllegalArgumentException if this metadata doesn't represent a split
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,12 @@ public SourceShardState getSourceShardState(int shardNum) {
return sourceShards[shardNum];
}

public boolean isSourceShard(int shardId) {
return shardId < shardCountBefore();
}

public boolean isTargetShard(int shardId) {
return shardId >= shardCountBefore();
return isSourceShard(shardId) == false;
}

/**
Expand Down Expand Up @@ -389,6 +393,10 @@ public Stream<TargetShardState> targetStates() {
return Arrays.stream(targetShards);
}

public Stream<SourceShardState> sourceStates() {
return Arrays.stream(sourceShards);
}

/**
* Check whether all target shards for the given source shard are done.
* @param shardNum a source shard index greater than or equal to 0 and less than the original shard count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ private static DiscoveryNode findSourceNodeForReshardSplitRecovery(
ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard();

if (sourceShardRouting.active() == false) {
assert false : sourceShardRouting;
assert false : sourceShardRouting.shortSummary();
logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting);
return null;
}
Expand All @@ -1014,7 +1014,7 @@ private static DiscoveryNode findSourceNodeForReshardSplitRecovery(
assert false : "Source node for reshard does not exist: " + sourceShardRouting.currentNodeId();
logger.trace(
"can't find reshard split source node because source shard {} is assigned to an unknown node.",
sourceShardRouting
sourceShardRouting.shortSummary()
);
return null;
}
Expand Down