Skip to content

Do not dispatch in IngestService #130253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ private void processBulkIndexIngestRequest(
) {
final long ingestStartTimeInNanos = relativeTimeNanos();
final BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original);
final Thread originalThread = Thread.currentThread();
getIngestService(original).executeBulkRequest(
metadata.id(),
original.numberOfActions(),
Expand All @@ -305,50 +306,42 @@ private void processBulkIndexIngestRequest(
(indexName) -> resolveFailureStore(indexName, metadata, threadPool.absoluteTimeInMillis()),
bulkRequestModifier::markItemForFailureStore,
bulkRequestModifier::markItemAsFailed,
(originalThread, exception) -> {
if (exception != null) {
logger.debug("failed to execute pipeline for a bulk request", exception);
listener.onFailure(exception);
listener.delegateFailureAndWrap((l, unused) -> {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, l);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(relativeTimeNanos() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(
ingestTookInMillis,
listener
);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
// (this will happen if pre-processing all items in the bulk failed)
actionListener.onResponse(new BulkResponse(new BulkItemResponse[0], 0));
} else {
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
}
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
}

@Override
public boolean isForceExecution() {
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
};
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
// coordination steps on the write thread
if (originalThread == Thread.currentThread()) {
runnable.run();
} else {
executor.execute(runnable);
@Override
public boolean isForceExecution() {
// If we fork back to a coordination thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request.
return true;
}
};
// If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
// coordination steps on the write thread
if (originalThread == Thread.currentThread()) {
runnable.run();
} else {
executor.execute(runnable);
}
}
},
executor

})
);
}

Expand Down
18 changes: 6 additions & 12 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -823,10 +822,7 @@ private static IngestPipelinesExecutionResult failWithoutStoringIn(String index,
* @param onFailure A callback executed when a document fails ingestion and does not need to be
* persisted. Accepts the slot in the collection of requests that the document
* occupies, and the exception that the document encountered.
* @param onCompletion A callback executed once all documents have been processed. Accepts the thread
* that ingestion completed on or an exception in the event that the entire operation
* has failed.
* @param executor Which executor the bulk request should be executed on.
* @param listener A callback executed once all documents have been processed.
*/
public void executeBulkRequest(
final ProjectId projectId,
Expand All @@ -836,25 +832,23 @@ public void executeBulkRequest(
final Function<String, Boolean> resolveFailureStore,
final TriConsumer<Integer, String, Exception> onStoreFailure,
final TriConsumer<Integer, Exception, IndexDocFailureStoreStatus> onFailure,
final BiConsumer<Thread, Exception> onCompletion,
final Executor executor
final ActionListener<Void> listener
) {
assert numberOfActionRequests > 0 : "numberOfActionRequests must be greater than 0 but was [" + numberOfActionRequests + "]";

// Adapt handler to ensure node features during ingest logic
final Function<String, Boolean> adaptedResolveFailureStore = wrapResolverWithFeatureCheck(resolveFailureStore);

executor.execute(new AbstractRunnable() {
new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
onCompletion.accept(null, e);
listener.onFailure(e);
}

@Override
protected void doRun() {
final Thread originalThread = Thread.currentThread();
try (var refs = new RefCountingRunnable(() -> onCompletion.accept(originalThread, null))) {
try (var refs = new RefCountingRunnable(() -> listener.onResponse(null))) {
int i = 0;
for (DocWriteRequest<?> actionRequest : actionRequests) {
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest);
Expand Down Expand Up @@ -933,7 +927,7 @@ public void onFailure(Exception e) {
}
}
}
});
}.run();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,13 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -127,7 +125,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
@Captor
ArgumentCaptor<TriConsumer<Integer, Exception, IndexDocFailureStoreStatus>> failureHandler;
@Captor
ArgumentCaptor<BiConsumer<Thread, Exception>> completionHandler;
ArgumentCaptor<ActionListener<Void>> listener;
@Captor
ArgumentCaptor<TransportResponseHandler<BulkResponse>> remoteResponseHandler;
@Captor
Expand Down Expand Up @@ -425,10 +423,9 @@ public void testIngestLocal() throws Exception {
redirectPredicate.capture(),
redirectHandler.capture(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
listener.capture()
);
completionHandler.getValue().accept(null, exception);
listener.getValue().onFailure(exception);
assertTrue(failureCalled.get());

// now check success
Expand All @@ -441,7 +438,7 @@ public void testIngestLocal() throws Exception {
assertNull(redirectPredicate.getValue().apply(WITH_DEFAULT_PIPELINE)); // no redirects for random existing indices
assertNull(redirectPredicate.getValue().apply("index")); // no redirects for non-existent indices with no templates
redirectHandler.getValue().apply(2, WITH_FAILURE_STORE_ENABLED + "-1", exception); // exception and redirect for request 3 (slot 2)
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null); // all ingestion completed
listener.getValue().onResponse(null); // all ingestion completed
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyNoMoreInteractions(transportService);
Expand Down Expand Up @@ -476,15 +473,14 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
any(),
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
listener.capture()
);
completionHandler.getValue().accept(null, exception);
listener.getValue().onFailure(exception);
assertTrue(failureCalled.get());

// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
listener.getValue().onResponse(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyNoMoreInteractions(transportService);
Expand Down Expand Up @@ -525,18 +521,17 @@ public void testIngestSystemLocal() throws Exception {
any(),
any(),
failureHandler.capture(),
completionHandler.capture(),
same(systemWriteCoordinationExecutor)
listener.capture()
);
completionHandler.getValue().accept(null, exception);
listener.getValue().onFailure(exception);
assertTrue(failureCalled.get());

// now check success
Iterator<DocWriteRequest<?>> req = bulkDocsItr.getValue().iterator();
// have an exception for our one index request
failureHandler.getValue().apply(0, exception, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
listener.getValue().onResponse(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyNoMoreInteractions(transportService);
Expand All @@ -558,7 +553,7 @@ public void testIngestForward() throws Exception {
ActionTestUtils.execute(action, null, bulkRequest, listener);

// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any(), any());
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(TransportBulkAction.NAME), any(), remoteResponseHandler.capture());
Expand Down Expand Up @@ -598,7 +593,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
ActionTestUtils.execute(singleItemBulkWriteAction, null, indexRequest, listener);

// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any(), any());
verify(ingestService, never()).executeBulkRequest(eq(projectId), anyInt(), any(), any(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(TransportBulkAction.NAME), any(), remoteResponseHandler.capture());
Expand Down Expand Up @@ -686,20 +681,19 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
any(),
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
listener.capture()
);
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
assertEquals(indexRequest3.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(null, exception);
listener.getValue().onFailure(exception);
assertTrue(failureCalled.get());

// now check success of the transport bulk action
indexRequest1.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
indexRequest3.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
listener.getValue().onResponse(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyNoMoreInteractions(transportService);
Expand Down Expand Up @@ -737,16 +731,15 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
any(),
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
listener.capture()
);
completionHandler.getValue().accept(null, exception);
listener.getValue().onFailure(exception);
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
assertTrue(failureCalled.get());

// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
listener.getValue().onResponse(null);
assertTrue(action.isExecuted);
assertTrue(action.indexCreated); // now the index is created since we skipped the ingest node path.
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
Expand Down Expand Up @@ -831,8 +824,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
any(),
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
listener.capture()
);
}

Expand Down Expand Up @@ -872,8 +864,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
any(),
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
listener.capture()
);
}

Expand Down Expand Up @@ -902,11 +893,10 @@ public void testIngestCallbackExceptionHandled() throws Exception {
any(),
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
listener.capture()
);
indexRequest1.autoGenerateId();
completionHandler.getValue().accept(Thread.currentThread(), null);
listener.getValue().onResponse(null);

// check failure passed through to the listener
assertFalse(action.isExecuted);
Expand Down Expand Up @@ -942,16 +932,15 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
any(),
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeCoordinationExecutor)
listener.capture()
);
assertEquals(indexRequest.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(null, exception);
listener.getValue().onFailure(exception);
assertTrue(failureCalled.get());

// now check success
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing
completionHandler.getValue().accept(DUMMY_WRITE_THREAD, null);
listener.getValue().onResponse(null);
assertTrue(action.isExecuted);
assertFalse(responseCalled.get()); // listener would only be called by real index action, not our mocked one
verifyNoMoreInteractions(transportService);
Expand Down
Loading