diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/QueryConfig.java b/backend/src/main/java/com/bakdata/conquery/models/config/QueryConfig.java index 1cb33d7dfa..9538ecfa3c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/QueryConfig.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/QueryConfig.java @@ -1,11 +1,16 @@ package com.bakdata.conquery.models.config; +import com.bakdata.conquery.util.validation.ValidCaffeineSpec; +import com.fasterxml.jackson.annotation.JsonIgnore; import io.dropwizard.util.Duration; +import io.dropwizard.validation.ValidationMethod; import lombok.Getter; import lombok.Setter; import lombok.ToString; -@Getter @Setter @ToString +@Getter +@Setter +@ToString public class QueryConfig { private ThreadPoolDefinition executionPool = new ThreadPoolDefinition(); @@ -19,4 +24,11 @@ public class QueryConfig { * TODO Implement global limit of active secondaryId sub plans */ private int secondaryIdSubPlanRetention = 15; + + @ValidCaffeineSpec(softValue=true) + private String L2CacheSpec = "softValues"; + @ValidCaffeineSpec + private String L1CacheSpec = "expireAfterAccess=10m"; + + } diff --git a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java index 0ebaaba7ca..d037494df1 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/query/ExecutionManager.java @@ -1,7 +1,8 @@ package com.bakdata.conquery.models.query; +import static com.bakdata.conquery.models.execution.ExecutionState.RUNNING; + import java.util.NoSuchElementException; -import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -24,11 +25,13 @@ import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.Namespace; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalNotification; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; import com.google.common.util.concurrent.Uninterruptibles; +import lombok.AccessLevel; import lombok.Data; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; @@ -40,34 +43,44 @@ public abstract class ExecutionManager { private final DatasetRegistry datasetRegistry; private final ConqueryConfig config; - /** - * Cache for running and recent execution infos. - */ - private final Cache executionInfos = - CacheBuilder.newBuilder() - .softValues() - .removalListener(this::executionRemoved) - .build(); + @Getter(AccessLevel.NONE) + private final Cache executionInfosL2; + @Getter(AccessLevel.NONE) + private final Cache executionInfosL1; + + public ExecutionManager(MetaStorage storage, DatasetRegistry datasetRegistry, ConqueryConfig config) { + this.storage = storage; + this.datasetRegistry = datasetRegistry; + this.config = config; + + executionInfosL2 = Caffeine.from(config.getQueries().getL2CacheSpec()) + .removalListener(this::evictionSoft) + .build(); + executionInfosL1 = Caffeine.from(config.getQueries().getL1CacheSpec()) + .removalListener(this::evictionHard) + .build(); + } /** * Manage state of evicted Queries, setting them to NEW. */ - private void executionRemoved(RemovalNotification removalNotification) { + private void evictionSoft(ManagedExecutionId executionId, ExecutionInfo resultInfo, RemovalCause cause) { + // If removal was done manually we assume it was also handled properly - if (!removalNotification.wasEvicted()) { + if (!cause.wasEvicted()) { return; } - final ManagedExecutionId executionId = removalNotification.getKey(); - - log.trace("Evicted Results for Query[{}] (Reason: {})", executionId, removalNotification.getCause()); - - final ManagedExecution execution = getExecution(executionId); + log.trace("Evicted Query[{}] results from soft cache (Reason: {})", executionId, cause); + } - // The query might already be deleted - if (execution != null) { - reset(executionId); + private void evictionHard(ManagedExecutionId executionId, ExecutionInfo resultInfo, RemovalCause cause) { + // If removal was done manually we assume it was also handled properly + if (!cause.wasEvicted()) { + return; } + + log.trace("Evicted Query[{}] results from hard cache (Reason: {})", executionId, cause); } public ManagedExecution getExecution(ManagedExecutionId execution) { @@ -84,30 +97,34 @@ public void reset(ManagedExecutionId id) { } public boolean isResultPresent(ManagedExecutionId id) { - return executionInfos.getIfPresent(id) != null; + return tryGetExecutionInfo(id).isPresent(); } public void clearQueryResults(ManagedExecutionId execution) { - executionInfos.invalidate(execution); + executionInfosL2.invalidate(execution); + executionInfosL1.invalidate(execution); } - /** - * Returns the state or throws an NoSuchElementException if no state was found. - */ - public R getExecutionInfo(ManagedExecutionId id) { - ExecutionInfo executionInfo = executionInfos.getIfPresent(id); - if (executionInfo == null) { - throw new NoSuchElementException("No execution found for %s".formatted(id)); + public Optional tryGetExecutionInfo(ManagedExecutionId id) { + // Access hard before soft, to keep things "touched" + ExecutionInfo maybeInfo = executionInfosL1.getIfPresent(id); + + if (maybeInfo != null) { + return Optional.of((R) maybeInfo); } - return (R) executionInfo; - } - public Optional tryGetExecutionInfo(ManagedExecutionId id) { - return Optional.ofNullable((R) executionInfos.getIfPresent(id)); + maybeInfo = executionInfosL2.getIfPresent(id); + + if (maybeInfo != null) { + return Optional.of((R) maybeInfo); + } + + return Optional.empty(); } public void addState(ManagedExecutionId id, ExecutionInfo result) { - executionInfos.put(id, result); + executionInfosL1.put(id, result); + executionInfosL2.put(id, result); } public final ManagedExecution runQuery(Namespace namespace, QueryDescription query, UserId user, boolean system) { @@ -182,35 +199,45 @@ public final void cancelExecution(final ManagedExecution execution) { externalExecution.cancel(); return; } - executionInfos.invalidate(execution.getId()); + + clearQueryResults(execution.getId()); + doCancelQuery(execution.getId()); } public abstract void doCancelQuery(ManagedExecutionId managedExecutionId); public void updateState(ManagedExecutionId id, ExecutionState execState) { - ExecutionInfo executionInfo = executionInfos.getIfPresent(id); - if (executionInfo != null) { - executionInfo.setExecutionState(execState); - return; - } + Optional executionInfo = tryGetExecutionInfo(id); - log.warn("Could not update execution executionInfo of {} to {}, because it had no executionInfo.", id, execState); + executionInfo.ifPresentOrElse(info -> info.setExecutionState(execState), + () -> log.warn("Could not update execution executionInfo of {} to {}, because it had no executionInfo.", id, execState) + ); } public Stream streamQueryResults(E execution) { - final InternalExecutionInfo resultParts = (InternalExecutionInfo) executionInfos.getIfPresent(execution.getId()); - - return resultParts == null - ? Stream.empty() - : resultParts.streamQueryResults(); + Optional maybeInfo = tryGetExecutionInfo(execution.getId()); + return maybeInfo.map(InternalExecutionInfo::streamQueryResults).orElseGet(Stream::empty); } public void clearBarrier(ManagedExecutionId id) { - ExecutionInfo result = Objects.requireNonNull(executionInfos.getIfPresent(id), "Cannot clear lock on absent execution result"); + ExecutionInfo executionInfo = getExecutionInfo(id); + + executionInfo.getExecutingLock().countDown(); + } + + /** + * Returns the state or throws an NoSuchElementException if no state was found. + */ + public R getExecutionInfo(ManagedExecutionId id) { + Optional maybeInfo = tryGetExecutionInfo(id); + + if (maybeInfo.isPresent()) { + return maybeInfo.get(); + } - result.getExecutingLock().countDown(); + throw new NoSuchElementException("Could not find Execution %s".formatted(id)); } /** @@ -218,24 +245,31 @@ public void clearBarrier(ManagedExecutionId id) { */ public ExecutionState awaitDone(ManagedExecutionId id, int time, TimeUnit unit) { - ExecutionInfo executionInfo = executionInfos.getIfPresent(id); - if (executionInfo == null) { + Optional maybeExecutionInfo = tryGetExecutionInfo(id); + + if (maybeExecutionInfo.isEmpty()) { return ExecutionState.NEW; } + ExecutionInfo executionInfo = maybeExecutionInfo.get(); ExecutionState execState = executionInfo.getExecutionState(); - if (execState != ExecutionState.RUNNING) { + if (execState != RUNNING) { return execState; } Uninterruptibles.awaitUninterruptibly(executionInfo.getExecutingLock(), time, unit); - ExecutionInfo executionInfoAfterWait = executionInfos.getIfPresent(id); - if (executionInfoAfterWait == null) { - return ExecutionState.NEW; + Optional maybeExecutionInfoAfterWait = tryGetExecutionInfo(id); + return maybeExecutionInfoAfterWait.map(ExecutionInfo::getExecutionState).orElse(ExecutionState.NEW); + } + + public boolean hasRunningQueries() { + if (executionInfosL2.asMap().values().stream().map(ExecutionInfo::getExecutionState).anyMatch(RUNNING::equals)) { + return true; } - return executionInfoAfterWait.getExecutionState(); + + return executionInfosL1.asMap().values().stream().map(ExecutionInfo::getExecutionState).anyMatch(RUNNING::equals); } /** @@ -246,8 +280,7 @@ public interface ExecutionInfo { /** * The current {@link ExecutionState} of the execution. */ - @NotNull - ExecutionState getExecutionState(); + @NotNull ExecutionState getExecutionState(); void setExecutionState(ExecutionState state); @@ -264,6 +297,4 @@ public interface InternalExecutionInfo extends ExecutionInfo { } - - } diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java index 3b6cdf45ff..91a2c58dcf 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java @@ -29,7 +29,6 @@ import com.bakdata.conquery.models.auth.entities.User; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Dataset; -import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.UserId; import com.bakdata.conquery.models.query.ExecutionManager; @@ -206,9 +205,7 @@ private boolean isBusy() { busy |= standaloneCommand.getManager().getDatasetRegistry().getNamespaces().stream() .map(Namespace::getExecutionManager) - .flatMap(e -> e.getExecutionInfos().asMap().values().stream()) - .map(ExecutionManager.ExecutionInfo::getExecutionState) - .anyMatch(ExecutionState.RUNNING::equals); + .anyMatch(ExecutionManager::hasRunningQueries); for (Namespace namespace : standaloneCommand.getManagerNode().getDatasetRegistry().getNamespaces()) { busy |= namespace.getJobManager().isSlowWorkerBusy();