diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index c69a7b6c7c991..8933322250b9a 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -254,6 +254,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59); public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60); public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61); + public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62); /* * STOP! READ THIS FIRST! No, really, diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 8070661865afc..a00a991b09e84 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2699,9 +2699,11 @@ public static Request newXContentRequest(HttpMethod method, String endpoint, ToX } protected static MapMatcher getProfileMatcher() { - return matchesMap().entry("query", instanceOf(Map.class)) + return matchesMap() // + .entry("query", instanceOf(Map.class)) .entry("planning", instanceOf(Map.class)) - .entry("drivers", instanceOf(List.class)); + .entry("drivers", instanceOf(List.class)) + .entry("plans", instanceOf(List.class)); } protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java index 538380fe7cd89..922049b8a97ec 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverCompletionInfo.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.operator; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -24,23 +25,34 @@ * roughly the number of documents times the number of * fields per document. Except {@code null} values don't count. * And multivalued fields count as many times as there are values. - * @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but + * @param driverProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but * not free so this will be empty if the {@code profile} option was not set in * the request. */ -public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List collectedProfiles) implements Writeable { +public record DriverCompletionInfo( + long documentsFound, + long valuesLoaded, + List driverProfiles, + List planProfiles +) implements Writeable { /** * Completion info we use when we didn't properly complete any drivers. * Usually this is returned with an error, but it's also used when receiving * responses from very old nodes. */ - public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of()); + public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of(), List.of()); /** * Build a {@link DriverCompletionInfo} for many drivers including their profile output. */ - public static DriverCompletionInfo includingProfiles(List drivers) { + public static DriverCompletionInfo includingProfiles( + List drivers, + String description, + String clusterName, + String nodeName, + String planTree + ) { long documentsFound = 0; long valuesLoaded = 0; List collectedProfiles = new ArrayList<>(drivers.size()); @@ -52,7 +64,12 @@ public static DriverCompletionInfo includingProfiles(List drivers) { } collectedProfiles.add(p); } - return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); + return new DriverCompletionInfo( + documentsFound, + valuesLoaded, + collectedProfiles, + List.of(new PlanProfile(description, clusterName, nodeName, planTree)) + ); } /** @@ -69,33 +86,45 @@ public static DriverCompletionInfo excludingProfiles(List drivers) { valuesLoaded += o.valuesLoaded(); } } - return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of()); + return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of(), List.of()); } - public DriverCompletionInfo(StreamInput in) throws IOException { - this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::new)); + public static DriverCompletionInfo readFrom(StreamInput in) throws IOException { + return new DriverCompletionInfo( + in.readVLong(), + in.readVLong(), + in.readCollectionAsImmutableList(DriverProfile::new), + in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19) + ? in.readCollectionAsImmutableList(PlanProfile::readFrom) + : List.of() + ); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(documentsFound); out.writeVLong(valuesLoaded); - out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o)); + out.writeCollection(driverProfiles); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19)) { + out.writeCollection(planProfiles); + } } public static class Accumulator { private long documentsFound; private long valuesLoaded; - private final List collectedProfiles = new ArrayList<>(); + private final List driverProfiles = new ArrayList<>(); + private final List planProfiles = new ArrayList<>(); public void accumulate(DriverCompletionInfo info) { this.documentsFound += info.documentsFound; this.valuesLoaded += info.valuesLoaded; - this.collectedProfiles.addAll(info.collectedProfiles); + this.driverProfiles.addAll(info.driverProfiles); + this.planProfiles.addAll(info.planProfiles); } public DriverCompletionInfo finish() { - return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles); + return new DriverCompletionInfo(documentsFound, valuesLoaded, driverProfiles, planProfiles); } } @@ -103,15 +132,17 @@ public static class AtomicAccumulator { private final AtomicLong documentsFound = new AtomicLong(); private final AtomicLong valuesLoaded = new AtomicLong(); private final List collectedProfiles = Collections.synchronizedList(new ArrayList<>()); + private final List planProfiles = Collections.synchronizedList(new ArrayList<>()); public void accumulate(DriverCompletionInfo info) { this.documentsFound.addAndGet(info.documentsFound); this.valuesLoaded.addAndGet(info.valuesLoaded); - this.collectedProfiles.addAll(info.collectedProfiles); + this.collectedProfiles.addAll(info.driverProfiles); + this.planProfiles.addAll(info.planProfiles); } public DriverCompletionInfo finish() { - return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles); + return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles, planProfiles); } } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java new file mode 100644 index 0000000000000..502fa024d31ea --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PlanProfile.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; + +public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject { + + public static PlanProfile readFrom(StreamInput in) throws IOException { + return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(description); + out.writeString(clusterName); + out.writeString(nodeName); + out.writeString(planTree); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject() + .field("description", description) + .field("cluster_name", clusterName) + .field("node_name", nodeName) + .field("plan", planTree) + .endObject(); + } +} diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java index 358ab204dd088..56687d0585ff0 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/PushQueriesIT.java @@ -317,7 +317,9 @@ private void testPushQuery( result, getResultMatcher(result).entry( "profile", - matchesMap().entry("drivers", instanceOf(List.class)) + matchesMap() // + .entry("drivers", instanceOf(List.class)) + .entry("plans", instanceOf(List.class)) .entry("planning", matchesMap().extraOk()) .entry("query", matchesMap().extraOk()) ), diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java index 46b692b04ffe5..19db8b798cfad 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/StoredFieldsSequentialIT.java @@ -111,7 +111,9 @@ private void testQuery(Double percent, String query, int documentsFound, boolean matchesMap().entry("documents_found", documentsFound) .entry( "profile", - matchesMap().entry("drivers", instanceOf(List.class)) + matchesMap() // + .entry("drivers", instanceOf(List.class)) + .entry("plans", instanceOf(List.class)) .entry("planning", matchesMap().extraOk()) .entry("query", matchesMap().extraOk()) ) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java index e11b16ebc5335..abb4e0194fdbb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.PlanProfile; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -121,7 +122,7 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException { long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0; long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { - profile = in.readOptionalWriteable(Profile::new); + profile = in.readOptionalWriteable(Profile::readFrom); } boolean columnar = in.readBoolean(); EsqlExecutionInfo executionInfo = null; @@ -261,6 +262,7 @@ public Iterator toXContentChunked(ToXContent.Params params ob.field("planning", executionInfo.planningTimeSpan()); } ob.array("drivers", profile.drivers.iterator(), ChunkedToXContentBuilder::append); + ob.array("plans", profile.plans.iterator()); })); } }); @@ -372,41 +374,23 @@ public EsqlResponse responseInternal() { return esqlResponse; } - public static class Profile implements Writeable { - private final List drivers; + public record Profile(List drivers, List plans) implements Writeable { - public Profile(List drivers) { - this.drivers = drivers; - } - - public Profile(StreamInput in) throws IOException { - this.drivers = in.readCollectionAsImmutableList(DriverProfile::new); + public static Profile readFrom(StreamInput in) throws IOException { + return new Profile( + in.readCollectionAsImmutableList(DriverProfile::new), + in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19) + ? in.readCollectionAsImmutableList(PlanProfile::readFrom) + : List.of() + ); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeCollection(drivers); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19)) { + out.writeCollection(plans); } - Profile profile = (Profile) o; - return Objects.equals(drivers, profile.drivers); - } - - @Override - public int hashCode() { - return Objects.hash(drivers); - } - - List drivers() { - return drivers; } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java index 7a236869dfcbf..82b96cfa6f236 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java @@ -60,10 +60,10 @@ final class ComputeResponse extends TransportResponse { ComputeResponse(StreamInput in) throws IOException { super(in); if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { - completionInfo = new DriverCompletionInfo(in); + completionInfo = DriverCompletionInfo.readFrom(in); } else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { if (in.readBoolean()) { - completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new)); + completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new), List.of()); } else { completionInfo = DriverCompletionInfo.EMPTY; } @@ -96,7 +96,7 @@ public void writeTo(StreamOutput out) throws IOException { completionInfo.writeTo(out); } else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { out.writeBoolean(true); - out.writeCollection(completionInfo.collectedProfiles()); + out.writeCollection(completionInfo.driverProfiles()); } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeOptionalTimeValue(took); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index a05258ebde153..a48d953850eeb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -18,7 +18,6 @@ import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.lucene.DataPartitioning; -import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.FailureCollector; @@ -392,9 +391,7 @@ void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, @Override public SourceProvider createSourceProvider() { - final Supplier supplier = () -> super.createSourceProvider(); - return new ReinitializingSourceProvider(supplier); - + return new ReinitializingSourceProvider(super::createSourceProvider); } }; contexts.add( @@ -407,7 +404,6 @@ public SourceProvider createSourceProvider() { searchService.getIndicesService().getAnalysis(), defaultDataPartitioning ); - final List drivers; try { LocalExecutionPlanner planner = new LocalExecutionPlanner( context.sessionId(), @@ -428,37 +424,44 @@ public SourceProvider createSourceProvider() { LOGGER.debug("Received physical plan:\n{}", plan); - plan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan); + var localPlan = PlannerUtils.localPlan(context.searchExecutionContexts(), context.configuration(), context.foldCtx(), plan); // the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below) // it's doing this in the planning of EsQueryExec (the source of the data) // see also EsPhysicalOperationProviders.sourcePhysicalOperation - LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.taskDescription(), context.foldCtx(), plan); + LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan( + context.taskDescription(), + context.foldCtx(), + localPlan + ); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe()); } - drivers = localExecutionPlan.createDrivers(context.sessionId()); + var drivers = localExecutionPlan.createDrivers(context.sessionId()); if (drivers.isEmpty()) { throw new IllegalStateException("no drivers created"); } LOGGER.debug("using {} drivers", drivers.size()); + driverRunner.executeDrivers( + task, + drivers, + transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME), + ActionListener.releaseAfter(listener.map(ignored -> { + if (context.configuration().profile()) { + return DriverCompletionInfo.includingProfiles( + drivers, + context.taskDescription(), + clusterService.getClusterName().value(), + transportService.getLocalNode().getName(), + localPlan.toString() + ); + } else { + return DriverCompletionInfo.excludingProfiles(drivers); + } + }), () -> Releasables.close(drivers)) + ); } catch (Exception e) { listener.onFailure(e); - return; } - ActionListener listenerCollectingStatus = listener.map(ignored -> { - if (context.configuration().profile()) { - return DriverCompletionInfo.includingProfiles(drivers); - } else { - return DriverCompletionInfo.excludingProfiles(drivers); - } - }); - listenerCollectingStatus = ActionListener.releaseAfter(listenerCollectingStatus, () -> Releasables.close(drivers)); - driverRunner.executeDrivers( - task, - drivers, - transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME), - listenerCollectingStatus - ); } static PhysicalPlan reductionPlan(ExchangeSinkExec plan, boolean enable) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java index 2477b515f66b2..0d21d37d1ac5e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeResponse.java @@ -15,6 +15,7 @@ import org.elasticsearch.transport.TransportResponse; import java.io.IOException; +import java.util.List; import java.util.Map; import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19; @@ -33,12 +34,12 @@ final class DataNodeComputeResponse extends TransportResponse { DataNodeComputeResponse(StreamInput in) throws IOException { if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) { - this.completionInfo = new DriverCompletionInfo(in); + this.completionInfo = DriverCompletionInfo.readFrom(in); this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException); return; } if (DataNodeComputeHandler.supportShardLevelRetryFailure(in.getTransportVersion())) { - this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new)); + this.completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new), List.of()); this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException); return; } @@ -54,7 +55,7 @@ public void writeTo(StreamOutput out) throws IOException { return; } if (DataNodeComputeHandler.supportShardLevelRetryFailure(out.getTransportVersion())) { - out.writeCollection(completionInfo.collectedProfiles(), (o, v) -> v.writeTo(o)); + out.writeCollection(completionInfo.driverProfiles()); out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException); return; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 12bc18e1153c6..06986c856e7dd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -334,7 +334,7 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config return new ColumnInfoImpl(c.name(), c.dataType().outputType(), originalTypes); }).toList(); EsqlQueryResponse.Profile profile = configuration.profile() - ? new EsqlQueryResponse.Profile(result.completionInfo().collectedProfiles()) + ? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles()) : null; threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0"); if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java index 091784210b47c..8a2344bffb678 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java @@ -7,15 +7,13 @@ package org.elasticsearch.xpack.esql.action; -import com.carrotsearch.randomizedtesting.generators.RandomStrings; - import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; -import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.OperatorStatus; +import org.elasticsearch.compute.operator.PlanProfile; import org.elasticsearch.test.AbstractWireSerializingTestCase; import java.util.List; @@ -23,17 +21,19 @@ public class EsqlQueryResponseProfileTests extends AbstractWireSerializingTestCase { @Override protected Writeable.Reader instanceReader() { - return EsqlQueryResponse.Profile::new; + return EsqlQueryResponse.Profile::readFrom; } @Override protected EsqlQueryResponse.Profile createTestInstance() { - return new EsqlQueryResponse.Profile(randomDriverProfiles()); + return new EsqlQueryResponse.Profile(randomDriverProfiles(), randomPlanProfiles()); } @Override protected EsqlQueryResponse.Profile mutateInstance(EsqlQueryResponse.Profile instance) { - return new EsqlQueryResponse.Profile(randomValueOtherThan(instance.drivers(), this::randomDriverProfiles)); + return randomBoolean() + ? new EsqlQueryResponse.Profile(randomValueOtherThan(instance.drivers(), this::randomDriverProfiles), instance.plans()) + : new EsqlQueryResponse.Profile(instance.drivers(), randomValueOtherThan(instance.plans(), this::randomPlanProfiles)); } @Override @@ -42,32 +42,39 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { } private List randomDriverProfiles() { - return randomList(10, this::randomDriverProfile); + return randomList( + 10, + () -> new DriverProfile( + randomIdentifier(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomList(10, this::randomOperatorStatus), + DriverSleeps.empty() + ) + ); } - private DriverProfile randomDriverProfile() { - return new DriverProfile( - RandomStrings.randomAsciiLettersOfLength(random(), 5), - randomNonNegativeLong(), - randomNonNegativeLong(), - randomNonNegativeLong(), - randomNonNegativeLong(), - randomNonNegativeLong(), - randomList(10, this::randomOperatorStatus), - DriverSleeps.empty() + private List randomPlanProfiles() { + return randomList( + 10, + () -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphanumericOfLength(1024)) ); } private OperatorStatus randomOperatorStatus() { - String name = randomAlphaOfLength(4); - Operator.Status status = randomBoolean() - ? null - : new AbstractPageMappingOperator.Status( - randomNonNegativeLong(), - randomNonNegativeInt(), - randomNonNegativeLong(), - randomNonNegativeLong() - ); - return new OperatorStatus(name, status); + return new OperatorStatus( + randomAlphaOfLength(4), + randomBoolean() + ? new AbstractPageMappingOperator.Status( + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ) + : null + ); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 0499b901b6e13..477ff7fa90b26 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; import org.elasticsearch.compute.operator.OperatorStatus; +import org.elasticsearch.compute.operator.PlanProfile; import org.elasticsearch.compute.test.TestBlockFactory; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; @@ -970,7 +971,8 @@ public void testProfileXContent() { List.of(new OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10021, 10, 111, 222))), DriverSleeps.empty() ) - ) + ), + List.of(new PlanProfile("test", "elasticsearch", "node-1", "plan tree")) ), false, false, @@ -1023,6 +1025,14 @@ public void testProfileXContent() { "last" : [ ] } } + ], + "plans" : [ + { + "description" : "test", + "cluster_name" : "elasticsearch", + "node_name" : "node-1", + "plan" : "plan tree" + } ] } }""")); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index b5e8547a981ab..72dc6af2cb5a0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -7,8 +7,6 @@ package org.elasticsearch.xpack.esql.plugin; -import com.carrotsearch.randomizedtesting.generators.RandomStrings; - import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; @@ -18,6 +16,7 @@ import org.elasticsearch.compute.operator.DriverCompletionInfo; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; +import org.elasticsearch.compute.operator.PlanProfile; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.ESTestCase; @@ -61,12 +60,14 @@ public void shutdownTransportService() { } private DriverCompletionInfo randomCompletionInfo() { - int numProfiles = randomIntBetween(0, 2); - List profiles = new ArrayList<>(numProfiles); - for (int i = 0; i < numProfiles; i++) { - profiles.add( - new DriverProfile( - RandomStrings.randomAsciiLettersOfLength(random(), 5), + return new DriverCompletionInfo( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomList( + 0, + 2, + () -> new DriverProfile( + randomIdentifier(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -75,9 +76,13 @@ private DriverCompletionInfo randomCompletionInfo() { List.of(), DriverSleeps.empty() ) - ); - } - return new DriverCompletionInfo(randomNonNegativeLong(), randomNonNegativeLong(), profiles); + ), + randomList( + 0, + 2, + () -> new PlanProfile(randomIdentifier(), randomIdentifier(), randomIdentifier(), randomAlphaOfLengthBetween(1, 1024)) + ) + ); } public void testEmpty() { @@ -86,7 +91,7 @@ public void testEmpty() { assertFalse(results.isDone()); } assertTrue(results.isDone()); - assertThat(results.actionGet(10, TimeUnit.SECONDS).collectedProfiles(), empty()); + assertThat(results.actionGet(10, TimeUnit.SECONDS).driverProfiles(), empty()); } public void testCollectComputeResults() { @@ -109,7 +114,7 @@ public void testCollectComputeResults() { var info = randomCompletionInfo(); documentsFound += info.documentsFound(); valuesLoaded += info.valuesLoaded(); - allProfiles.addAll(info.collectedProfiles()); + allProfiles.addAll(info.driverProfiles()); ActionListener subListener = computeListener.acquireCompute(); threadPool.schedule( ActionRunnable.wrap(subListener, l -> l.onResponse(info)), @@ -123,7 +128,7 @@ public void testCollectComputeResults() { assertThat(actual.documentsFound(), equalTo(documentsFound)); assertThat(actual.valuesLoaded(), equalTo(valuesLoaded)); assertThat( - actual.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), + actual.driverProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum))) ); assertThat(onFailure.get(), equalTo(0)); @@ -178,7 +183,7 @@ public void onResponse(DriverCompletionInfo result) { assertThat(result.documentsFound(), equalTo(documentsFound.get())); assertThat(result.valuesLoaded(), equalTo(valuesLoaded.get())); assertThat( - result.collectedProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), + result.driverProfiles().stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum)), equalTo(allProfiles.stream().collect(Collectors.toMap(p -> p, p -> 1, Integer::sum))) ); Map> responseHeaders = threadPool.getThreadContext() @@ -216,7 +221,7 @@ public void onFailure(Exception e) { var resp = randomCompletionInfo(); documentsFound.addAndGet(resp.documentsFound()); valuesLoaded.addAndGet(resp.valuesLoaded()); - allProfiles.addAll(resp.collectedProfiles()); + allProfiles.addAll(resp.driverProfiles()); int numWarnings = randomIntBetween(1, 5); Map warnings = new HashMap<>(); for (int i = 0; i < numWarnings; i++) {