Skip to content

Don't use ThrottledTaskRunner for async cache evictions #129458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public List<Setting<?>> getSettings() {
SharedBlobCacheService.SHARED_CACHE_DECAY_INTERVAL_SETTING,
SharedBlobCacheService.SHARED_CACHE_MIN_TIME_DELTA_SETTING,
SharedBlobCacheService.SHARED_CACHE_MMAP,
SharedBlobCacheService.SHARED_CACHE_COUNT_READS,
SharedBlobCacheService.SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING
SharedBlobCacheService.SHARED_CACHE_COUNT_READS
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RelativeByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.Nullable;
Expand All @@ -53,11 +52,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -98,13 +99,6 @@ public class SharedBlobCacheService<KeyType> implements Releasable {
Setting.Property.NodeScope
);

public static final Setting<Integer> SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING = Setting.intSetting(
SHARED_CACHE_SETTINGS_PREFIX + "concurrent_evictions",
5,
1,
Setting.Property.NodeScope
);

private static Setting.Validator<ByteSizeValue> getPageSizeAlignedByteSizeValueValidator(String settingName) {
return value -> {
if (value.getBytes() == -1) {
Expand Down Expand Up @@ -338,7 +332,7 @@ private CacheEntry(T chunk) {
private final Runnable evictIncrementer;

private final LongSupplier relativeTimeInNanosSupplier;
private final ThrottledTaskRunner asyncEvictionsRunner;
private final ExecutorService asyncEvictionsExecutor;

public SharedBlobCacheService(
NodeEnvironment environment,
Expand Down Expand Up @@ -399,11 +393,7 @@ public SharedBlobCacheService(
this.blobCacheMetrics = blobCacheMetrics;
this.evictIncrementer = blobCacheMetrics.getEvictedCountNonZeroFrequency()::increment;
this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
this.asyncEvictionsRunner = new ThrottledTaskRunner(
"shared_blob_cache_evictions",
SHARED_CACHE_CONCURRENT_EVICTIONS_SETTING.get(settings),
threadPool.generic()
);
this.asyncEvictionsExecutor = threadPool.generic();
}

public static long calculateCacheSize(Settings settings, long totalFsSize) {
Expand Down Expand Up @@ -1590,6 +1580,8 @@ void touch() {
private final DecayAndNewEpochTask decayAndNewEpochTask;

private final AtomicLong epoch = new AtomicLong();
private final Queue<Predicate<KeyType>> evictionQueue = new ConcurrentLinkedQueue<>();
private final AtomicInteger evictionRunnerActive = new AtomicInteger(0);

@SuppressWarnings("unchecked")
LFUCache(Settings settings) {
Expand Down Expand Up @@ -1671,22 +1663,20 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {

@Override
public void forceEvictAsync(Predicate<KeyType> cacheKeyPredicate) {
asyncEvictionsRunner.enqueueTask(new ActionListener<>() {
@Override
public void onResponse(Releasable releasable) {
try (releasable) {
forceEvict(cacheKeyPredicate);
}
}
evictionQueue.add(cacheKeyPredicate);
startRunnerIfIdle();
}

@Override
public void onFailure(Exception e) {
// should be impossible, GENERIC pool doesn't reject anything
final String message = "unexpected failure evicting from shared blob cache";
logger.error(message, e);
assert false : new AssertionError(message, e);
}
});
private void startRunnerIfIdle() {
if (evictionRunnerActive.getAndIncrement() == 0) {
asyncEvictionsExecutor.submit(() -> {
int evictionRunnerValue = -1;
while (evictionQueue.isEmpty() == false || evictionRunnerActive.compareAndSet(evictionRunnerValue, 0) == false) {
forceEvict(evictionQueue.poll());
evictionRunnerValue = evictionRunnerActive.get();
}
});
}
}

private LFUCacheEntry initChunk(LFUCacheEntry entry) {
Expand Down