From c0da9b4449ce7fb8c0d676cae4ff239aa1cce072 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Sun, 29 Jun 2025 09:38:43 +0300 Subject: [PATCH 1/4] Add TEST MergeWithLowDiskSpaceIT testForceMergeIsBlockedThenUnblocked (#130189) This adds a new IT for when forced merges are blocked (and then unblocked) because of the insufficient disk space situation. This test was suggested https://github.com/elastic/elasticsearch/pull/127613#pullrequestreview-2900135287. --- .../index/engine/MergeWithLowDiskSpaceIT.java | 122 +++++++++++++++++- 1 file changed, 121 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index c2687468a7f26..7a76872c27a54 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -9,9 +9,12 @@ package org.elasticsearch.index.engine; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.DiskUsageIntegTestCase; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; @@ -19,16 +22,23 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.BeforeClass; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Locale; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.lessThan; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -40,7 +50,14 @@ public static void setAvailableDiskSpaceBufferLimit() { // this has to be big in order to potentially accommodate the disk space for a few 100s of docs and a few merges, // because of the latency to process used disk space updates, and also because we cannot reliably separate indexing from merging // operations at this high abstraction level (merging is triggered more or less automatically in the background) - MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(1_000_000L, 2_000_000L); + MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(10_000_000L, 20_000_000L); + } + + @Override + protected Collection> nodePlugins() { + List> nodePluginsList = new ArrayList<>(super.nodePlugins()); + nodePluginsList.add(TestTelemetryPlugin.class); + return nodePluginsList; } @Override @@ -155,8 +172,111 @@ public void testShardCloseWhenDiskSpaceInsufficient() throws Exception { }); } + public void testForceMergeIsBlockedThenUnblocked() throws Exception { + String node = internalCluster().startNode(); + ensureStableCluster(1); + setTotalSpace(node, Long.MAX_VALUE); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node) + .getThreadPoolMergeExecutorService(); + TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node); + // create some index + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build() + ); + // get current disk space usage (for all indices on the node) + IndicesStatsResponse stats = indicesAdmin().prepareStats().clear().setStore(true).get(); + long usedDiskSpaceAfterIndexing = stats.getTotal().getStore().sizeInBytes(); + // restrict the total disk space such that the next merge does not have sufficient disk space + long insufficientTotalDiskSpace = usedDiskSpaceAfterIndexing + MERGE_DISK_HIGH_WATERMARK_BYTES - randomLongBetween(1L, 10L); + setTotalSpace(node, insufficientTotalDiskSpace); + // node stats' FS stats should report that there is insufficient disk space available + assertBusy(() -> { + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setFs(true).get(); + assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + assertThat(nodeStats.getFs().getTotal().getTotal().getBytes(), equalTo(insufficientTotalDiskSpace)); + assertThat(nodeStats.getFs().getTotal().getAvailable().getBytes(), lessThan(MERGE_DISK_HIGH_WATERMARK_BYTES)); + }); + int indexingRounds = randomIntBetween(2, 5); + while (indexingRounds-- > 0) { + indexRandom( + true, + true, + true, + false, + IntStream.range(1, randomIntBetween(2, 5)) + .mapToObj(i -> prepareIndex(indexName).setSource("field", randomAlphaOfLength(50))) + .toList() + ); + } + // the max segments argument makes it a blocking call + ActionFuture forceMergeFuture = indicesAdmin().prepareForceMerge(indexName).setMaxNumSegments(1).execute(); + assertBusy(() -> { + // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued + assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1)); + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + // telemetry says that there are indeed some segments enqueued to be merged + testTelemetryPlugin.collect(); + assertThat( + testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), + greaterThan(0L) + ); + // but still no merges are currently running + assertThat( + testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), + equalTo(0L) + ); + // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running") + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get(); + long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); + assertThat(currentMergeCount, equalTo(0L)); + }); + // the force merge call is still blocked + assertFalse(forceMergeFuture.isCancelled()); + assertFalse(forceMergeFuture.isDone()); + // merge executor still confirms merging is blocked due to insufficient disk space + assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); + // make disk space available in order to unblock the merge + if (randomBoolean()) { + setTotalSpace(node, Long.MAX_VALUE); + } else { + updateClusterSettings( + Settings.builder().put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING.getKey(), "0b") + ); + } + // wait for the merge call to return + safeGet(forceMergeFuture); + IndicesStatsResponse indicesStatsResponse = indicesAdmin().prepareStats(indexName).setMerge(true).get(); + testTelemetryPlugin.collect(); + // assert index stats and telemetry report no merging in progress (after force merge returned) + long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); + assertThat(currentMergeCount, equalTo(0L)); + assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), equalTo(0L)); + assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), equalTo(0L)); + // but some merging took place (there might have been other merges automatically triggered before the force merge call) + long totalMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getTotal(); + assertThat(totalMergeCount, greaterThan(0L)); + assertThat(testTelemetryPlugin.getLongCounterMeasurement(MergeMetrics.MERGE_DOCS_TOTAL).getLast().getLong(), greaterThan(0L)); + // assert there's a single segment after the force merge + List shardSegments = getShardSegments(indexName); + assertThat(shardSegments.size(), equalTo(1)); + assertThat(shardSegments.get(0).getSegments().size(), equalTo(1)); + assertAcked(indicesAdmin().prepareDelete(indexName).get()); + } + public void setTotalSpace(String dataNodeName, long totalSpace) { getTestFileStore(dataNodeName).setTotalSpace(totalSpace); refreshClusterInfo(); } + + private TestTelemetryPlugin getTelemetryPlugin(String dataNodeName) { + var plugin = internalCluster().getInstance(PluginsService.class, dataNodeName) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + plugin.resetMeter(); + return plugin; + } } From a98d99e002ba86d053e5215e07ff1938c191d61a Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Mon, 30 Jun 2025 13:08:29 +0300 Subject: [PATCH 2/4] merge telemetry metrics don't exist in 9.0 --- .../index/engine/MergeWithLowDiskSpaceIT.java | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index 7a76872c27a54..6034123b154a1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -22,15 +22,12 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.BeforeClass; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Locale; import java.util.stream.IntStream; @@ -53,13 +50,6 @@ public static void setAvailableDiskSpaceBufferLimit() { MERGE_DISK_HIGH_WATERMARK_BYTES = randomLongBetween(10_000_000L, 20_000_000L); } - @Override - protected Collection> nodePlugins() { - List> nodePluginsList = new ArrayList<>(super.nodePlugins()); - nodePluginsList.add(TestTelemetryPlugin.class); - return nodePluginsList; - } - @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { return Settings.builder() @@ -217,17 +207,6 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception { // merge executor says merging is blocked due to insufficient disk space while there is a single merge task enqueued assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), equalTo(1)); assertTrue(threadPoolMergeExecutorService.isMergingBlockedDueToInsufficientDiskSpace()); - // telemetry says that there are indeed some segments enqueued to be merged - testTelemetryPlugin.collect(); - assertThat( - testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), - greaterThan(0L) - ); - // but still no merges are currently running - assertThat( - testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), - equalTo(0L) - ); // indices stats also says that no merge is currently running (blocked merges are NOT considered as "running") IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats(indexName).setMerge(true).get(); long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); @@ -253,12 +232,9 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception { // assert index stats and telemetry report no merging in progress (after force merge returned) long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); assertThat(currentMergeCount, equalTo(0L)); - assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_QUEUED_USAGE).getLast().getLong(), equalTo(0L)); - assertThat(testTelemetryPlugin.getLongGaugeMeasurement(MergeMetrics.MERGE_SEGMENTS_RUNNING_USAGE).getLast().getLong(), equalTo(0L)); // but some merging took place (there might have been other merges automatically triggered before the force merge call) long totalMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getTotal(); assertThat(totalMergeCount, greaterThan(0L)); - assertThat(testTelemetryPlugin.getLongCounterMeasurement(MergeMetrics.MERGE_DOCS_TOTAL).getLast().getLong(), greaterThan(0L)); // assert there's a single segment after the force merge List shardSegments = getShardSegments(indexName); assertThat(shardSegments.size(), equalTo(1)); From 102fe14cdcde411913f69bd83c34a09c6613e1c7 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Mon, 30 Jun 2025 14:01:33 +0300 Subject: [PATCH 3/4] Fix MergeWithLowDiskSpaceIT post telemetry plugin removal --- .../index/engine/MergeWithLowDiskSpaceIT.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index 6034123b154a1..365713352a4a3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -168,7 +168,6 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception { setTotalSpace(node, Long.MAX_VALUE); ThreadPoolMergeExecutorService threadPoolMergeExecutorService = internalCluster().getInstance(IndicesService.class, node) .getThreadPoolMergeExecutorService(); - TestTelemetryPlugin testTelemetryPlugin = getTelemetryPlugin(node); // create some index final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createIndex( @@ -228,7 +227,6 @@ public void testForceMergeIsBlockedThenUnblocked() throws Exception { // wait for the merge call to return safeGet(forceMergeFuture); IndicesStatsResponse indicesStatsResponse = indicesAdmin().prepareStats(indexName).setMerge(true).get(); - testTelemetryPlugin.collect(); // assert index stats and telemetry report no merging in progress (after force merge returned) long currentMergeCount = indicesStatsResponse.getIndices().get(indexName).getPrimaries().merge.getCurrent(); assertThat(currentMergeCount, equalTo(0L)); @@ -246,13 +244,4 @@ public void setTotalSpace(String dataNodeName, long totalSpace) { getTestFileStore(dataNodeName).setTotalSpace(totalSpace); refreshClusterInfo(); } - - private TestTelemetryPlugin getTelemetryPlugin(String dataNodeName) { - var plugin = internalCluster().getInstance(PluginsService.class, dataNodeName) - .filterPlugins(TestTelemetryPlugin.class) - .findFirst() - .orElseThrow(); - plugin.resetMeter(); - return plugin; - } } From 8ffff1041b1b87255fd1f41ae5a450786103c787 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 30 Jun 2025 11:09:37 +0000 Subject: [PATCH 4/4] [CI] Auto commit changes from spotless --- .../org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java index 365713352a4a3..9c3eae66d7b6f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/MergeWithLowDiskSpaceIT.java @@ -22,8 +22,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.BeforeClass;