From 0be586802788aa48f9abdbdde90f26a7dd07a53e Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 16 Jun 2025 14:46:55 +1000 Subject: [PATCH 1/2] Don't use ThrottledTaskRunner for async cache evictions --- .../blobcache/BlobCachePlugin.java | 3 +- .../shared/SharedBlobCacheService.java | 47 +++++++------------ 2 files changed, 19 insertions(+), 31 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java index 64d4c8d4dc511..4f9ac3eb99348 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/BlobCachePlugin.java @@ -27,8 +27,7 @@ public List> getSettings() { SharedBlobCacheService.SHARED_CACHE_DECAY_INTERVAL_SETTING, SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING, SharedBlobCacheService.SHARED_CACHE_MMAP, - SharedBlobCacheService.SHARED_CACHE_COUNT_READS, - SharedBlobCacheService.SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING + SharedBlobCacheService.SHARED_CACHE_COUNT_READS ); } } diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 4a202562e5e3d..74014cce2c7e9 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.RelativeByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Nullable; @@ -53,12 +52,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -98,13 +100,6 @@ public class SharedBlobCacheService implements Releasable { Setting.Property.NodeScope ); - public static final Setting SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING = Setting.intSetting( - SHARED_CACHE_SETTINGS_PREFIX + "concurrent_evictions", - 5, - 1, - Setting.Property.NodeScope - ); - private static Setting.Validator getPageSizeAlignedByteSizeValueValidator(String settingName) { return value -> { if (value.getBytes() == -1) { @@ -338,7 +333,7 @@ private CacheEntry(T chunk) { private final Runnable evictIncrementer; private final LongSupplier relativeTimeInNanosSupplier; - private final ThrottledTaskRunner asyncEvictionsRunner; + private final ExecutorService asyncEvictionsExecutor; public SharedBlobCacheService( NodeEnvironment environment, @@ -399,11 +394,7 @@ public SharedBlobCacheService( this.blobCacheMetrics = blobCacheMetrics; this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment; this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier; - this.asyncEvictionsRunner = new ThrottledTaskRunner( - "shared_blob_cache_evictions", - SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING.get(settings), - threadPool.generic() - ); + this.asyncEvictionsExecutor = threadPool.generic(); } public static long calculateCacheSize(Settings settings, long totalFsSize) { @@ -1590,6 +1581,8 @@ void touch() { private final DecayAndNewEpochTask decayAndNewEpochTask; private final AtomicLong epoch = new AtomicLong(); + private final Queue> evictionQueue = new ConcurrentLinkedQueue<>(); + private final AtomicBoolean evictionRunnerActive = new AtomicBoolean(false); @SuppressWarnings("unchecked") LFUCache(Settings settings) { @@ -1671,22 +1664,18 @@ public int forceEvict(Predicate cacheKeyPredicate) { @Override public void forceEvictAsync(Predicate cacheKeyPredicate) { - asyncEvictionsRunner.enqueueTask(new ActionListener<>() { - @Override - public void onResponse(Releasable releasable) { - try (releasable) { - forceEvict(cacheKeyPredicate); - } - } + evictionQueue.add(cacheKeyPredicate); + startRunnerIfIdle(); + } - @Override - public void onFailure(Exception e) { - // should be impossible, GENERIC pool doesn't reject anything - final String message = "unexpected failure evicting from shared blob cache"; - logger.error(message, e); - assert false : new AssertionError(message, e); - } - }); + private void startRunnerIfIdle() { + if (evictionRunnerActive.compareAndSet(false, true)) { + asyncEvictionsExecutor.submit(() -> { + while (evictionQueue.isEmpty() == false || evictionRunnerActive.compareAndSet(true, false) == false) { + forceEvict(evictionQueue.poll()); + } + }); + } } private LFUCacheEntry initChunk(LFUCacheEntry entry) { From 15fd785cf76e7d7d1827651d35fce5cabf4801b6 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 16 Jun 2025 15:01:51 +1000 Subject: [PATCH 2/2] Fix race condition --- .../blobcache/shared/SharedBlobCacheService.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 74014cce2c7e9..d74ade1783fe2 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -60,7 +60,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; @@ -1582,7 +1581,7 @@ void touch() { private final AtomicLong epoch = new AtomicLong(); private final Queue> evictionQueue = new ConcurrentLinkedQueue<>(); - private final AtomicBoolean evictionRunnerActive = new AtomicBoolean(false); + private final AtomicInteger evictionRunnerActive = new AtomicInteger(0); @SuppressWarnings("unchecked") LFUCache(Settings settings) { @@ -1669,10 +1668,12 @@ public void forceEvictAsync(Predicate cacheKeyPredicate) { } private void startRunnerIfIdle() { - if (evictionRunnerActive.compareAndSet(false, true)) { + if (evictionRunnerActive.getAndIncrement() == 0) { asyncEvictionsExecutor.submit(() -> { - while (evictionQueue.isEmpty() == false || evictionRunnerActive.compareAndSet(true, false) == false) { + int evictionRunnerValue = -1; + while (evictionQueue.isEmpty() == false || evictionRunnerActive.compareAndSet(evictionRunnerValue, 0) == false) { forceEvict(evictionQueue.poll()); + evictionRunnerValue = evictionRunnerActive.get(); } }); }