Skip to content

First draft of loading ShardWorkers using JobManager and Lifecycle Ma… #3708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package com.bakdata.conquery.commands;

import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.mode.cluster.ClusterConnectionShard;
import com.bakdata.conquery.mode.cluster.InternalMapperFactory;
import com.bakdata.conquery.models.config.ConqueryConfig;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.bakdata.conquery.models.worker.Worker;
import com.bakdata.conquery.tasks.LoadStorageTask;
import com.bakdata.conquery.util.io.ConqueryMDC;
import io.dropwizard.core.ConfiguredBundle;
import io.dropwizard.core.setup.Environment;
import io.dropwizard.lifecycle.setup.LifecycleEnvironment;
Expand Down Expand Up @@ -56,7 +48,9 @@ public void run(ConqueryConfig config, Environment environment) throws Exception
config.getQueries().getExecutionPool(),
internalMapperFactory,
config.getCluster().getEntityBucketSize(),
config.getQueries().getSecondaryIdSubPlanRetention()
config.getQueries().getSecondaryIdSubPlanRetention(),
config.getStorage(),
config.isFailOnError()
);

lifecycle.manage(workers);
Expand All @@ -67,40 +61,9 @@ public void run(ConqueryConfig config, Environment environment) throws Exception
new ClusterConnectionShard(config, environment, workers, internalMapperFactory);

lifecycle.manage(clusterConnection);

final Collection<? extends WorkerStorage> workerStorages = config.getStorage().discoverWorkerStorages();


ExecutorService loaders = config.getQueries().getExecutionPool().createService("Worker loader");

Queue<Worker> workersDone = new ConcurrentLinkedQueue<>();
for (WorkerStorage workerStorage : workerStorages) {
loaders.submit(() -> {
try {
workersDone.add(workers.createWorker(workerStorage, config.isFailOnError(), config.getStorage().isLoadStoresOnStart()));
}
catch (Exception e) {
log.error("Failed reading Storage", e);
}
finally {
log.debug("DONE reading Storage {}", workerStorage);
ConqueryMDC.clearLocation();
}
});
}

loaders.shutdown();
while (!loaders.awaitTermination(1, TimeUnit.MINUTES)) {

log.debug("Waiting for Worker workers to load. {} are already finished. {} pending", workersDone.size(), workerStorages.size()
- workersDone.size());
}

log.info("All Worker loaded: {}", workers.getWorkers().size());
}



public boolean isBusy() {
return clusterConnection.isBusy() || workers.isBusy();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void react(ShardNodeNetworkContext context) throws Exception {
log.info("creating a new worker for {}", dataset);
ConqueryConfig config = context.getConfig();

Worker worker = context.getWorkers().createWorker(dataset, config.getStorage(), createWorkerName(), config.isFailOnError());
Worker worker = context.getWorkers().newWorker(dataset, createWorkerName(), config.isFailOnError());

worker.setSession(context.getSession());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import com.bakdata.conquery.commands.ShardNode;
import com.bakdata.conquery.io.jackson.MutableInjectableValues;
Expand All @@ -19,6 +18,7 @@
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
import com.bakdata.conquery.models.jobs.SimpleJob;
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dropwizard.lifecycle.Managed;
Expand All @@ -45,10 +45,17 @@ public class ShardWorkers implements NamespacedStorageProvider, Managed {
private final InternalMapperFactory internalMapperFactory;
private final int entityBucketSize;
private final int secondaryIdSubPlanRetention;
private final AtomicInteger nextWorker = new AtomicInteger(0);
private final StoreFactory storeFactory;
private final boolean failOnError;


public ShardWorkers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) {

public ShardWorkers(
ThreadPoolDefinition queryThreadPoolDefinition,
InternalMapperFactory internalMapperFactory,
int entityBucketSize,
int secondaryIdSubPlanRetention,
StoreFactory storeFactory,
boolean failOnError) {
this.queryThreadPoolDefinition = queryThreadPoolDefinition;

// TODO This shouldn't be coupled to the query thread pool definition
Expand All @@ -57,39 +64,36 @@ public ShardWorkers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapp
this.internalMapperFactory = internalMapperFactory;
this.entityBucketSize = entityBucketSize;
this.secondaryIdSubPlanRetention = secondaryIdSubPlanRetention;
this.storeFactory = storeFactory;
this.failOnError = failOnError;

jobsThreadPool.prestartAllCoreThreads();
}

public Worker createWorker(WorkerStorage storage, boolean failOnError, boolean loadStorage) {

final ObjectMapper persistenceMapper = internalMapperFactory.createWorkerPersistenceMapper(storage);
public Worker newWorker(Dataset dataset, @NonNull String name, boolean failOnError) {

final Worker worker =
new Worker(queryThreadPoolDefinition, storage, jobsThreadPool, failOnError, entityBucketSize, persistenceMapper, secondaryIdSubPlanRetention, loadStorage);
final Worker worker = Worker.newWorker(
dataset,
queryThreadPoolDefinition,
jobsThreadPool,
storeFactory,
name,
failOnError,
entityBucketSize,
internalMapperFactory,
secondaryIdSubPlanRetention
);

addWorker(worker);
registerWorker(worker);

return worker;
}

private void addWorker(Worker worker) {
nextWorker.incrementAndGet();
private void registerWorker(Worker worker) {
workers.put(worker.getInfo().getId(), worker);
dataset2Worker.put(worker.getStorage().getDataset().getId(), worker);
}

public Worker createWorker(Dataset dataset, StoreFactory storageConfig, @NonNull String name, boolean failOnError) {

final Worker
worker =
Worker.newWorker(dataset, queryThreadPoolDefinition, jobsThreadPool, storageConfig, name, failOnError, entityBucketSize, internalMapperFactory, secondaryIdSubPlanRetention);

addWorker(worker);

return worker;
}

public Worker getWorker(WorkerId worker) {
return Objects.requireNonNull(workers.get(worker));
}
Expand All @@ -112,14 +116,14 @@ removed from dataset2Worker (which is used in deserialization of NamespacedIds,
try {
removed.remove();
}
catch(Exception e) {
catch (Exception e) {
log.error("Failed to remove storage {}", removed, e);
}
}

public boolean isBusy() {
for( Worker worker : workers.values()) {
if(worker.isBusy()) {
for (Worker worker : workers.values()) {
if (worker.isBusy()) {
return true;
}
}
Expand All @@ -128,12 +132,38 @@ public boolean isBusy() {

@Override
public void start() throws Exception {
final Collection<? extends WorkerStorage> workerStorages = storeFactory.discoverWorkerStorages();

for (WorkerStorage workerStorage : workerStorages) {
try {
final Worker worker = workerFromStorage(workerStorage);
worker.load(storeFactory.isLoadStoresOnStart());
}
catch (Exception e) {
log.error("Failed initializing Storage", e);
}
finally {
ConqueryMDC.clearLocation();
}
}

for (Worker value : getWorkers().values()) {
value.getJobManager().addSlowJob(new SimpleJob("Update Bucket Manager", value.getBucketManager()::fullUpdate));
}
}

private Worker workerFromStorage(WorkerStorage storage) {

final ObjectMapper persistenceMapper = internalMapperFactory.createWorkerPersistenceMapper(storage);

final Worker worker =
new Worker(queryThreadPoolDefinition, storage, jobsThreadPool, failOnError, entityBucketSize, persistenceMapper, secondaryIdSubPlanRetention);

registerWorker(worker);

return worker;
}

@Override
public void stop() {
jobsThreadPool.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import com.bakdata.conquery.models.identifiable.ids.specific.ImportId;
import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.messages.namespaces.NamespaceMessage;
import com.bakdata.conquery.models.messages.network.MessageToManagerNode;
import com.bakdata.conquery.models.messages.network.NetworkMessage;
import com.bakdata.conquery.models.messages.network.specific.ForwardToNamespace;
import com.bakdata.conquery.models.query.QueryExecutor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import lombok.Data;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
Expand Down Expand Up @@ -62,21 +65,13 @@ public Worker(
boolean failOnError,
int entityBucketSize,
ObjectMapper persistenceMapper,
int secondaryIdSubPlanLimit,
boolean loadStorage
int secondaryIdSubPlanLimit
) {
this.storage = storage;
this.jobsExecutorService = jobsExecutorService;


storage.openStores(persistenceMapper);

storage.loadKeys();

if (loadStorage) {
storage.loadData();
}

jobManager = new JobManager(storage.getWorker().getName(), failOnError);
queryExecutor = new QueryExecutor(this, queryThreadPoolDefinition.createService("QueryExecutor %d"), secondaryIdSubPlanLimit);
bucketManager = BucketManager.create(this, storage, entityBucketSize);
Expand All @@ -94,14 +89,14 @@ public static Worker newWorker(
InternalMapperFactory internalMapperFactory,
int secondaryIdSubPlanLimit) {

WorkerStorageImpl workerStorage = new WorkerStorageImpl(config, directory);
final WorkerStorageImpl workerStorage = new WorkerStorageImpl(config, directory);
final ObjectMapper persistenceMapper = internalMapperFactory.createWorkerPersistenceMapper(workerStorage);
workerStorage.openStores(persistenceMapper);

dataset.setNamespacedStorageProvider(workerStorage);

// On the worker side we don't have to set the object writer for ForwardToWorkerMessages in WorkerInformation
WorkerInformation info = new WorkerInformation();
final WorkerInformation info = new WorkerInformation();
info.setDataset(dataset.getId());
info.setName(directory);
info.setEntityBucketSize(entityBucketSize);
Expand All @@ -110,8 +105,7 @@ public static Worker newWorker(
workerStorage.close();


return new Worker(queryThreadPoolDefinition, workerStorage, jobsExecutorService, failOnError, entityBucketSize, persistenceMapper, secondaryIdSubPlanLimit,
config.isLoadStoresOnStart());
return new Worker(queryThreadPoolDefinition, workerStorage, jobsExecutorService, failOnError, entityBucketSize, persistenceMapper, secondaryIdSubPlanLimit);
}

public ModificationShieldedWorkerStorage getStorage() {
Expand Down Expand Up @@ -229,4 +223,41 @@ public void addSecondaryId(SecondaryIdDescription secondaryId) {
public void removeSecondaryId(SecondaryIdDescriptionId secondaryId) {
storage.removeSecondaryId(secondaryId);
}

public void load(boolean loadStoresOnStart) {
jobManager.addSlowJob(new LoadWorkerStorageJob(loadStoresOnStart));

}

@Data
private final class LoadWorkerStorageJob extends Job {
private final boolean alsoLoadStorage;

@Override
public void execute() throws Exception {
log.info("BEGIN loading keys for {}", Worker.this);

final Stopwatch timer = Stopwatch.createStarted();

getStorage().loadKeys();

log.debug("DONE loading keys for {} within {}", Worker.this, timer);


if (alsoLoadStorage) {
timer.reset();

log.info("BEGIN loading data for {}", Worker.this);

getStorage().loadData();

log.debug("DONE loading keys for {} within {}", Worker.this, timer);
}
}

@Override
public String getLabel() {
return "Load WorkerStorage %s".formatted(Worker.this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ public void execute(String name, TestConquery testConquery) throws Exception {

final StandaloneSupport support = testConquery.openDataset(dataset);


log.info("Restart complete");

DatasetRegistry<?> datasetRegistry = support.getDatasetRegistry();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.bakdata.conquery.models.index.IndexService;
import com.bakdata.conquery.models.worker.DatasetRegistry;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.ShardWorkers;
import com.bakdata.conquery.util.NonPersistentStoreFactory;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -59,12 +58,6 @@ public void before() {

// Prepare worker persistence mapper
workerStorage.openStores(shardInternalMapper);
ShardWorkers workers = new ShardWorkers(
config.getQueries().getExecutionPool(),
internalMapperFactory,
config.getCluster().getEntityBucketSize(),
config.getQueries().getSecondaryIdSubPlanRetention()
);
shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(workerStorage);

// Prepare api response mapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ public class TestConquery {
public synchronized StandaloneSupport openDataset(DatasetId datasetId) {
try {
log.info("loading dataset");
return createSupport(datasetId, datasetId.getName());
final StandaloneSupport support = createSupport(datasetId, datasetId.getName());
support.waitUntilWorkDone();

return support;
}
catch (Exception e) {
return fail("Failed to open dataset " + datasetId, e);
Expand Down Expand Up @@ -166,8 +169,7 @@ public void waitUntilWorkDone() {
for (int i = 0; i < 5; i++) {

await().atMost(10, TimeUnit.SECONDS)
.pollDelay(1, TimeUnit.MILLISECONDS)
.pollInterval(5, TimeUnit.MILLISECONDS)
.pollInterval(1, TimeUnit.MILLISECONDS)
.until(() -> !isBusy());
}

Expand Down
Loading