Skip to content

Implements two stage Caching for ExecutionManager. Hard references, b… #3760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package com.bakdata.conquery.models.config;

import com.bakdata.conquery.util.validation.ValidCaffeineSpec;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.dropwizard.util.Duration;
import io.dropwizard.validation.ValidationMethod;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Getter @Setter @ToString
@Getter
@Setter
@ToString
public class QueryConfig {

private ThreadPoolDefinition executionPool = new ThreadPoolDefinition();
Expand All @@ -19,4 +24,11 @@ public class QueryConfig {
* TODO Implement global limit of active secondaryId sub plans
*/
private int secondaryIdSubPlanRetention = 15;

@ValidCaffeineSpec(softValue=true)
private String L2CacheSpec = "softValues";
@ValidCaffeineSpec
private String L1CacheSpec = "expireAfterAccess=10m";


}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.bakdata.conquery.models.query;

import static com.bakdata.conquery.models.execution.ExecutionState.RUNNING;

import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -24,11 +25,13 @@
import com.bakdata.conquery.models.query.results.EntityResult;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.Namespace;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalNotification;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.google.common.util.concurrent.Uninterruptibles;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;

Expand All @@ -40,34 +43,44 @@ public abstract class ExecutionManager {
private final DatasetRegistry<?> datasetRegistry;
private final ConqueryConfig config;

/**
* Cache for running and recent execution infos.
*/
private final Cache<ManagedExecutionId, ExecutionInfo> executionInfos =
CacheBuilder.newBuilder()
.softValues()
.removalListener(this::executionRemoved)
.build();
@Getter(AccessLevel.NONE)
private final Cache<ManagedExecutionId, ExecutionInfo> executionInfosL2;
@Getter(AccessLevel.NONE)
private final Cache<ManagedExecutionId, ExecutionInfo> executionInfosL1;

public ExecutionManager(MetaStorage storage, DatasetRegistry<?> datasetRegistry, ConqueryConfig config) {
this.storage = storage;
this.datasetRegistry = datasetRegistry;
this.config = config;

executionInfosL2 = Caffeine.from(config.getQueries().getL2CacheSpec())
.removalListener(this::evictionSoft)
.build();
executionInfosL1 = Caffeine.from(config.getQueries().getL1CacheSpec())
.removalListener(this::evictionHard)
.build();
}

/**
* Manage state of evicted Queries, setting them to NEW.
*/
private void executionRemoved(RemovalNotification<ManagedExecutionId, ExecutionInfo> removalNotification) {
private void evictionSoft(ManagedExecutionId executionId, ExecutionInfo resultInfo, RemovalCause cause) {

// If removal was done manually we assume it was also handled properly
if (!removalNotification.wasEvicted()) {
if (!cause.wasEvicted()) {
return;
}

final ManagedExecutionId executionId = removalNotification.getKey();

log.trace("Evicted Results for Query[{}] (Reason: {})", executionId, removalNotification.getCause());

final ManagedExecution execution = getExecution(executionId);
log.trace("Evicted Query[{}] results from soft cache (Reason: {})", executionId, cause);
}

// The query might already be deleted
if (execution != null) {
reset(executionId);
private void evictionHard(ManagedExecutionId executionId, ExecutionInfo resultInfo, RemovalCause cause) {
// If removal was done manually we assume it was also handled properly
if (!cause.wasEvicted()) {
return;
}

log.trace("Evicted Query[{}] results from hard cache (Reason: {})", executionId, cause);
}

public ManagedExecution getExecution(ManagedExecutionId execution) {
Expand All @@ -84,30 +97,34 @@ public void reset(ManagedExecutionId id) {
}

public boolean isResultPresent(ManagedExecutionId id) {
return executionInfos.getIfPresent(id) != null;
return tryGetExecutionInfo(id).isPresent();
}

public void clearQueryResults(ManagedExecutionId execution) {
executionInfos.invalidate(execution);
executionInfosL2.invalidate(execution);
executionInfosL1.invalidate(execution);
}

/**
* Returns the state or throws an NoSuchElementException if no state was found.
*/
public <R extends ExecutionInfo> R getExecutionInfo(ManagedExecutionId id) {
ExecutionInfo executionInfo = executionInfos.getIfPresent(id);
if (executionInfo == null) {
throw new NoSuchElementException("No execution found for %s".formatted(id));
public <R extends ExecutionInfo> Optional<R> tryGetExecutionInfo(ManagedExecutionId id) {
// Access hard before soft, to keep things "touched"
ExecutionInfo maybeInfo = executionInfosL1.getIfPresent(id);

if (maybeInfo != null) {
return Optional.of((R) maybeInfo);
}
return (R) executionInfo;
}

public <R extends ExecutionInfo> Optional<R> tryGetExecutionInfo(ManagedExecutionId id) {
return Optional.ofNullable((R) executionInfos.getIfPresent(id));
maybeInfo = executionInfosL2.getIfPresent(id);

if (maybeInfo != null) {
return Optional.of((R) maybeInfo);
}

return Optional.empty();
}

public void addState(ManagedExecutionId id, ExecutionInfo result) {
executionInfos.put(id, result);
executionInfosL1.put(id, result);
executionInfosL2.put(id, result);
}

public final ManagedExecution runQuery(Namespace namespace, QueryDescription query, UserId user, boolean system) {
Expand Down Expand Up @@ -182,60 +199,77 @@ public final void cancelExecution(final ManagedExecution execution) {
externalExecution.cancel();
return;
}
executionInfos.invalidate(execution.getId());

clearQueryResults(execution.getId());

doCancelQuery(execution.getId());
}

public abstract void doCancelQuery(ManagedExecutionId managedExecutionId);

public void updateState(ManagedExecutionId id, ExecutionState execState) {
ExecutionInfo executionInfo = executionInfos.getIfPresent(id);
if (executionInfo != null) {
executionInfo.setExecutionState(execState);
return;
}
Optional<ExecutionInfo> executionInfo = tryGetExecutionInfo(id);

log.warn("Could not update execution executionInfo of {} to {}, because it had no executionInfo.", id, execState);
executionInfo.ifPresentOrElse(info -> info.setExecutionState(execState),
() -> log.warn("Could not update execution executionInfo of {} to {}, because it had no executionInfo.", id, execState)
);
}

public <E extends ManagedExecution & InternalExecution> Stream<EntityResult> streamQueryResults(E execution) {
final InternalExecutionInfo resultParts = (InternalExecutionInfo) executionInfos.getIfPresent(execution.getId());

return resultParts == null
? Stream.empty()
: resultParts.streamQueryResults();
Optional<InternalExecutionInfo> maybeInfo = tryGetExecutionInfo(execution.getId());

return maybeInfo.map(InternalExecutionInfo::streamQueryResults).orElseGet(Stream::empty);
}

public void clearBarrier(ManagedExecutionId id) {
ExecutionInfo result = Objects.requireNonNull(executionInfos.getIfPresent(id), "Cannot clear lock on absent execution result");
ExecutionInfo executionInfo = getExecutionInfo(id);

executionInfo.getExecutingLock().countDown();
}

/**
* Returns the state or throws an NoSuchElementException if no state was found.
*/
public <R extends ExecutionInfo> R getExecutionInfo(ManagedExecutionId id) {
Optional<R> maybeInfo = tryGetExecutionInfo(id);

if (maybeInfo.isPresent()) {
return maybeInfo.get();
}

result.getExecutingLock().countDown();
throw new NoSuchElementException("Could not find Execution %s".formatted(id));
}

/**
* Blocks until an execution finished of the specified timeout is reached. Return immediately if the execution is not running
*/
public ExecutionState awaitDone(ManagedExecutionId id, int time, TimeUnit unit) {

ExecutionInfo executionInfo = executionInfos.getIfPresent(id);
if (executionInfo == null) {
Optional<ExecutionInfo> maybeExecutionInfo = tryGetExecutionInfo(id);

if (maybeExecutionInfo.isEmpty()) {
return ExecutionState.NEW;
}

ExecutionInfo executionInfo = maybeExecutionInfo.get();
ExecutionState execState = executionInfo.getExecutionState();

if (execState != ExecutionState.RUNNING) {
if (execState != RUNNING) {
return execState;
}

Uninterruptibles.awaitUninterruptibly(executionInfo.getExecutingLock(), time, unit);

ExecutionInfo executionInfoAfterWait = executionInfos.getIfPresent(id);
if (executionInfoAfterWait == null) {
return ExecutionState.NEW;
Optional<ExecutionInfo> maybeExecutionInfoAfterWait = tryGetExecutionInfo(id);
return maybeExecutionInfoAfterWait.map(ExecutionInfo::getExecutionState).orElse(ExecutionState.NEW);
}

public boolean hasRunningQueries() {
if (executionInfosL2.asMap().values().stream().map(ExecutionInfo::getExecutionState).anyMatch(RUNNING::equals)) {
return true;
}
return executionInfoAfterWait.getExecutionState();

return executionInfosL1.asMap().values().stream().map(ExecutionInfo::getExecutionState).anyMatch(RUNNING::equals);
}

/**
Expand All @@ -246,8 +280,7 @@ public interface ExecutionInfo {
/**
* The current {@link ExecutionState} of the execution.
*/
@NotNull
ExecutionState getExecutionState();
@NotNull ExecutionState getExecutionState();

void setExecutionState(ExecutionState state);

Expand All @@ -264,6 +297,4 @@ public interface InternalExecutionInfo extends ExecutionInfo {
}




}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.bakdata.conquery.models.auth.entities.User;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.datasets.Dataset;
import com.bakdata.conquery.models.execution.ExecutionState;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.identifiable.ids.specific.UserId;
import com.bakdata.conquery.models.query.ExecutionManager;
Expand Down Expand Up @@ -206,9 +205,7 @@ private boolean isBusy() {

busy |= standaloneCommand.getManager().getDatasetRegistry().getNamespaces().stream()
.map(Namespace::getExecutionManager)
.flatMap(e -> e.getExecutionInfos().asMap().values().stream())
.map(ExecutionManager.ExecutionInfo::getExecutionState)
.anyMatch(ExecutionState.RUNNING::equals);
.anyMatch(ExecutionManager::hasRunningQueries);

for (Namespace namespace : standaloneCommand.getManagerNode().getDatasetRegistry().getNamespaces()) {
busy |= namespace.getJobManager().isSlowWorkerBusy();
Expand Down
Loading