From 2ec9072f96d886ad2041ed6ec4fd8e608c83415e Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Tue, 25 Mar 2025 18:03:14 +0100 Subject: [PATCH] First draft of loading ShardWorkers using JobManager and Lifecycle Managed --- .../bakdata/conquery/commands/ShardNode.java | 43 +--------- .../messages/network/specific/AddWorker.java | 2 +- .../conquery/models/worker/ShardWorkers.java | 84 +++++++++++++------ .../conquery/models/worker/Worker.java | 57 ++++++++++--- .../integration/tests/RestartTest.java | 1 - .../io/AbstractSerializationTest.java | 7 -- .../conquery/util/support/TestConquery.java | 8 +- 7 files changed, 110 insertions(+), 92 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java index d6f784fac6..14f3831708 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/ShardNode.java @@ -1,19 +1,11 @@ package com.bakdata.conquery.commands; -import java.util.Collection; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import com.bakdata.conquery.io.storage.WorkerStorage; import com.bakdata.conquery.mode.cluster.ClusterConnectionShard; import com.bakdata.conquery.mode.cluster.InternalMapperFactory; import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.worker.ShardWorkers; import com.bakdata.conquery.models.worker.Worker; import com.bakdata.conquery.tasks.LoadStorageTask; -import com.bakdata.conquery.util.io.ConqueryMDC; import io.dropwizard.core.ConfiguredBundle; import io.dropwizard.core.setup.Environment; import io.dropwizard.lifecycle.setup.LifecycleEnvironment; @@ -56,7 +48,9 @@ public void run(ConqueryConfig config, Environment environment) throws Exception config.getQueries().getExecutionPool(), internalMapperFactory, config.getCluster().getEntityBucketSize(), - config.getQueries().getSecondaryIdSubPlanRetention() + config.getQueries().getSecondaryIdSubPlanRetention(), + config.getStorage(), + config.isFailOnError() ); lifecycle.manage(workers); @@ -67,40 +61,9 @@ public void run(ConqueryConfig config, Environment environment) throws Exception new ClusterConnectionShard(config, environment, workers, internalMapperFactory); lifecycle.manage(clusterConnection); - - final Collection workerStorages = config.getStorage().discoverWorkerStorages(); - - - ExecutorService loaders = config.getQueries().getExecutionPool().createService("Worker loader"); - - Queue workersDone = new ConcurrentLinkedQueue<>(); - for (WorkerStorage workerStorage : workerStorages) { - loaders.submit(() -> { - try { - workersDone.add(workers.createWorker(workerStorage, config.isFailOnError(), config.getStorage().isLoadStoresOnStart())); - } - catch (Exception e) { - log.error("Failed reading Storage", e); - } - finally { - log.debug("DONE reading Storage {}", workerStorage); - ConqueryMDC.clearLocation(); - } - }); - } - - loaders.shutdown(); - while (!loaders.awaitTermination(1, TimeUnit.MINUTES)) { - - log.debug("Waiting for Worker workers to load. {} are already finished. {} pending", workersDone.size(), workerStorages.size() - - workersDone.size()); - } - - log.info("All Worker loaded: {}", workers.getWorkers().size()); } - public boolean isBusy() { return clusterConnection.isBusy() || workers.isBusy(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/AddWorker.java b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/AddWorker.java index 8c8b29e4bd..f2e9ddca20 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/AddWorker.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/network/specific/AddWorker.java @@ -27,7 +27,7 @@ public void react(ShardNodeNetworkContext context) throws Exception { log.info("creating a new worker for {}", dataset); ConqueryConfig config = context.getConfig(); - Worker worker = context.getWorkers().createWorker(dataset, config.getStorage(), createWorkerName(), config.isFailOnError()); + Worker worker = context.getWorkers().newWorker(dataset, createWorkerName(), config.isFailOnError()); worker.setSession(context.getSession()); diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardWorkers.java b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardWorkers.java index 55b2241b26..40ad5fc952 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardWorkers.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardWorkers.java @@ -5,7 +5,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; import com.bakdata.conquery.commands.ShardNode; import com.bakdata.conquery.io.jackson.MutableInjectableValues; @@ -19,6 +18,7 @@ import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import com.bakdata.conquery.models.jobs.SimpleJob; +import com.bakdata.conquery.util.io.ConqueryMDC; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.ObjectMapper; import io.dropwizard.lifecycle.Managed; @@ -45,10 +45,17 @@ public class ShardWorkers implements NamespacedStorageProvider, Managed { private final InternalMapperFactory internalMapperFactory; private final int entityBucketSize; private final int secondaryIdSubPlanRetention; - private final AtomicInteger nextWorker = new AtomicInteger(0); + private final StoreFactory storeFactory; + private final boolean failOnError; - - public ShardWorkers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapperFactory internalMapperFactory, int entityBucketSize, int secondaryIdSubPlanRetention) { + + public ShardWorkers( + ThreadPoolDefinition queryThreadPoolDefinition, + InternalMapperFactory internalMapperFactory, + int entityBucketSize, + int secondaryIdSubPlanRetention, + StoreFactory storeFactory, + boolean failOnError) { this.queryThreadPoolDefinition = queryThreadPoolDefinition; // TODO This shouldn't be coupled to the query thread pool definition @@ -57,39 +64,36 @@ public ShardWorkers(ThreadPoolDefinition queryThreadPoolDefinition, InternalMapp this.internalMapperFactory = internalMapperFactory; this.entityBucketSize = entityBucketSize; this.secondaryIdSubPlanRetention = secondaryIdSubPlanRetention; + this.storeFactory = storeFactory; + this.failOnError = failOnError; jobsThreadPool.prestartAllCoreThreads(); } - public Worker createWorker(WorkerStorage storage, boolean failOnError, boolean loadStorage) { - - final ObjectMapper persistenceMapper = internalMapperFactory.createWorkerPersistenceMapper(storage); + public Worker newWorker(Dataset dataset, @NonNull String name, boolean failOnError) { - final Worker worker = - new Worker(queryThreadPoolDefinition, storage, jobsThreadPool, failOnError, entityBucketSize, persistenceMapper, secondaryIdSubPlanRetention, loadStorage); + final Worker worker = Worker.newWorker( + dataset, + queryThreadPoolDefinition, + jobsThreadPool, + storeFactory, + name, + failOnError, + entityBucketSize, + internalMapperFactory, + secondaryIdSubPlanRetention + ); - addWorker(worker); + registerWorker(worker); return worker; } - private void addWorker(Worker worker) { - nextWorker.incrementAndGet(); + private void registerWorker(Worker worker) { workers.put(worker.getInfo().getId(), worker); dataset2Worker.put(worker.getStorage().getDataset().getId(), worker); } - public Worker createWorker(Dataset dataset, StoreFactory storageConfig, @NonNull String name, boolean failOnError) { - - final Worker - worker = - Worker.newWorker(dataset, queryThreadPoolDefinition, jobsThreadPool, storageConfig, name, failOnError, entityBucketSize, internalMapperFactory, secondaryIdSubPlanRetention); - - addWorker(worker); - - return worker; - } - public Worker getWorker(WorkerId worker) { return Objects.requireNonNull(workers.get(worker)); } @@ -112,14 +116,14 @@ removed from dataset2Worker (which is used in deserialization of NamespacedIds, try { removed.remove(); } - catch(Exception e) { + catch (Exception e) { log.error("Failed to remove storage {}", removed, e); } } - + public boolean isBusy() { - for( Worker worker : workers.values()) { - if(worker.isBusy()) { + for (Worker worker : workers.values()) { + if (worker.isBusy()) { return true; } } @@ -128,12 +132,38 @@ public boolean isBusy() { @Override public void start() throws Exception { + final Collection workerStorages = storeFactory.discoverWorkerStorages(); + + for (WorkerStorage workerStorage : workerStorages) { + try { + final Worker worker = workerFromStorage(workerStorage); + worker.load(storeFactory.isLoadStoresOnStart()); + } + catch (Exception e) { + log.error("Failed initializing Storage", e); + } + finally { + ConqueryMDC.clearLocation(); + } + } for (Worker value : getWorkers().values()) { value.getJobManager().addSlowJob(new SimpleJob("Update Bucket Manager", value.getBucketManager()::fullUpdate)); } } + private Worker workerFromStorage(WorkerStorage storage) { + + final ObjectMapper persistenceMapper = internalMapperFactory.createWorkerPersistenceMapper(storage); + + final Worker worker = + new Worker(queryThreadPoolDefinition, storage, jobsThreadPool, failOnError, entityBucketSize, persistenceMapper, secondaryIdSubPlanRetention); + + registerWorker(worker); + + return worker; + } + @Override public void stop() { jobsThreadPool.shutdown(); diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/Worker.java b/backend/src/main/java/com/bakdata/conquery/models/worker/Worker.java index 9e4c9e53dc..d2590bc38a 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/Worker.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/Worker.java @@ -22,6 +22,7 @@ import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.identifiable.ids.specific.SecondaryIdDescriptionId; import com.bakdata.conquery.models.identifiable.ids.specific.TableId; +import com.bakdata.conquery.models.jobs.Job; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.messages.namespaces.NamespaceMessage; import com.bakdata.conquery.models.messages.network.MessageToManagerNode; @@ -29,6 +30,8 @@ import com.bakdata.conquery.models.messages.network.specific.ForwardToNamespace; import com.bakdata.conquery.models.query.QueryExecutor; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Stopwatch; +import lombok.Data; import lombok.Getter; import lombok.NonNull; import lombok.Setter; @@ -62,21 +65,13 @@ public Worker( boolean failOnError, int entityBucketSize, ObjectMapper persistenceMapper, - int secondaryIdSubPlanLimit, - boolean loadStorage + int secondaryIdSubPlanLimit ) { this.storage = storage; this.jobsExecutorService = jobsExecutorService; - storage.openStores(persistenceMapper); - storage.loadKeys(); - - if (loadStorage) { - storage.loadData(); - } - jobManager = new JobManager(storage.getWorker().getName(), failOnError); queryExecutor = new QueryExecutor(this, queryThreadPoolDefinition.createService("QueryExecutor %d"), secondaryIdSubPlanLimit); bucketManager = BucketManager.create(this, storage, entityBucketSize); @@ -94,14 +89,14 @@ public static Worker newWorker( InternalMapperFactory internalMapperFactory, int secondaryIdSubPlanLimit) { - WorkerStorageImpl workerStorage = new WorkerStorageImpl(config, directory); + final WorkerStorageImpl workerStorage = new WorkerStorageImpl(config, directory); final ObjectMapper persistenceMapper = internalMapperFactory.createWorkerPersistenceMapper(workerStorage); workerStorage.openStores(persistenceMapper); dataset.setNamespacedStorageProvider(workerStorage); // On the worker side we don't have to set the object writer for ForwardToWorkerMessages in WorkerInformation - WorkerInformation info = new WorkerInformation(); + final WorkerInformation info = new WorkerInformation(); info.setDataset(dataset.getId()); info.setName(directory); info.setEntityBucketSize(entityBucketSize); @@ -110,8 +105,7 @@ public static Worker newWorker( workerStorage.close(); - return new Worker(queryThreadPoolDefinition, workerStorage, jobsExecutorService, failOnError, entityBucketSize, persistenceMapper, secondaryIdSubPlanLimit, - config.isLoadStoresOnStart()); + return new Worker(queryThreadPoolDefinition, workerStorage, jobsExecutorService, failOnError, entityBucketSize, persistenceMapper, secondaryIdSubPlanLimit); } public ModificationShieldedWorkerStorage getStorage() { @@ -229,4 +223,41 @@ public void addSecondaryId(SecondaryIdDescription secondaryId) { public void removeSecondaryId(SecondaryIdDescriptionId secondaryId) { storage.removeSecondaryId(secondaryId); } + + public void load(boolean loadStoresOnStart) { + jobManager.addSlowJob(new LoadWorkerStorageJob(loadStoresOnStart)); + + } + + @Data + private final class LoadWorkerStorageJob extends Job { + private final boolean alsoLoadStorage; + + @Override + public void execute() throws Exception { + log.info("BEGIN loading keys for {}", Worker.this); + + final Stopwatch timer = Stopwatch.createStarted(); + + getStorage().loadKeys(); + + log.debug("DONE loading keys for {} within {}", Worker.this, timer); + + + if (alsoLoadStorage) { + timer.reset(); + + log.info("BEGIN loading data for {}", Worker.this); + + getStorage().loadData(); + + log.debug("DONE loading keys for {} within {}", Worker.this, timer); + } + } + + @Override + public String getLabel() { + return "Load WorkerStorage %s".formatted(Worker.this); + } + } } \ No newline at end of file diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java index 49a500cc42..63bc4aa9cf 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java @@ -147,7 +147,6 @@ public void execute(String name, TestConquery testConquery) throws Exception { final StandaloneSupport support = testConquery.openDataset(dataset); - log.info("Restart complete"); DatasetRegistry datasetRegistry = support.getDatasetRegistry(); diff --git a/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java b/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java index 48c1039273..a68ec661fe 100644 --- a/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java +++ b/backend/src/test/java/com/bakdata/conquery/io/AbstractSerializationTest.java @@ -13,7 +13,6 @@ import com.bakdata.conquery.models.index.IndexService; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.ShardWorkers; import com.bakdata.conquery.util.NonPersistentStoreFactory; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; @@ -59,12 +58,6 @@ public void before() { // Prepare worker persistence mapper workerStorage.openStores(shardInternalMapper); - ShardWorkers workers = new ShardWorkers( - config.getQueries().getExecutionPool(), - internalMapperFactory, - config.getCluster().getEntityBucketSize(), - config.getQueries().getSecondaryIdSubPlanRetention() - ); shardInternalMapper = internalMapperFactory.createWorkerPersistenceMapper(workerStorage); // Prepare api response mapper diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java index 18aece82fd..76f9339975 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/TestConquery.java @@ -75,7 +75,10 @@ public class TestConquery { public synchronized StandaloneSupport openDataset(DatasetId datasetId) { try { log.info("loading dataset"); - return createSupport(datasetId, datasetId.getName()); + final StandaloneSupport support = createSupport(datasetId, datasetId.getName()); + support.waitUntilWorkDone(); + + return support; } catch (Exception e) { return fail("Failed to open dataset " + datasetId, e); @@ -166,8 +169,7 @@ public void waitUntilWorkDone() { for (int i = 0; i < 5; i++) { await().atMost(10, TimeUnit.SECONDS) - .pollDelay(1, TimeUnit.MILLISECONDS) - .pollInterval(5, TimeUnit.MILLISECONDS) + .pollInterval(1, TimeUnit.MILLISECONDS) .until(() -> !isBusy()); }