Skip to content

[8.19] Add query plans to profile output #130181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59);
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be added in a PR to main first that include the isPatchFrom logic to parse it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, opened a pr for that: #130187


/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2699,9 +2699,11 @@ public static Request newXContentRequest(HttpMethod method, String endpoint, ToX
}

protected static MapMatcher getProfileMatcher() {
return matchesMap().entry("query", instanceOf(Map.class))
return matchesMap() //
.entry("query", instanceOf(Map.class))
.entry("planning", instanceOf(Map.class))
.entry("drivers", instanceOf(List.class));
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class));
}

protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial, boolean includeDocumentsFound) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.compute.operator;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -24,23 +25,34 @@
* <strong>roughly</strong> the number of documents times the number of
* fields per document. Except {@code null} values don't count.
* And multivalued fields count as many times as there are values.
* @param collectedProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
* @param driverProfiles {@link DriverProfile}s from each driver. These are fairly cheap to build but
* not free so this will be empty if the {@code profile} option was not set in
* the request.
*/
public record DriverCompletionInfo(long documentsFound, long valuesLoaded, List<DriverProfile> collectedProfiles) implements Writeable {
public record DriverCompletionInfo(
long documentsFound,
long valuesLoaded,
List<DriverProfile> driverProfiles,
List<PlanProfile> planProfiles
) implements Writeable {

/**
* Completion info we use when we didn't properly complete any drivers.
* Usually this is returned with an error, but it's also used when receiving
* responses from very old nodes.
*/
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of());
public static final DriverCompletionInfo EMPTY = new DriverCompletionInfo(0, 0, List.of(), List.of());

/**
* Build a {@link DriverCompletionInfo} for many drivers including their profile output.
*/
public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
public static DriverCompletionInfo includingProfiles(
List<Driver> drivers,
String description,
String clusterName,
String nodeName,
String planTree
) {
long documentsFound = 0;
long valuesLoaded = 0;
List<DriverProfile> collectedProfiles = new ArrayList<>(drivers.size());
Expand All @@ -52,7 +64,12 @@ public static DriverCompletionInfo includingProfiles(List<Driver> drivers) {
}
collectedProfiles.add(p);
}
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
return new DriverCompletionInfo(
documentsFound,
valuesLoaded,
collectedProfiles,
List.of(new PlanProfile(description, clusterName, nodeName, planTree))
);
}

/**
Expand All @@ -69,49 +86,63 @@ public static DriverCompletionInfo excludingProfiles(List<Driver> drivers) {
valuesLoaded += o.valuesLoaded();
}
}
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of());
return new DriverCompletionInfo(documentsFound, valuesLoaded, List.of(), List.of());
}

public DriverCompletionInfo(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong(), in.readCollectionAsImmutableList(DriverProfile::new));
public static DriverCompletionInfo readFrom(StreamInput in) throws IOException {
return new DriverCompletionInfo(
in.readVLong(),
in.readVLong(),
in.readCollectionAsImmutableList(DriverProfile::new),
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19)
? in.readCollectionAsImmutableList(PlanProfile::readFrom)
: List.of()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(documentsFound);
out.writeVLong(valuesLoaded);
out.writeCollection(collectedProfiles, (o, v) -> v.writeTo(o));
out.writeCollection(driverProfiles);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19)) {
out.writeCollection(planProfiles);
}
}

public static class Accumulator {
private long documentsFound;
private long valuesLoaded;
private final List<DriverProfile> collectedProfiles = new ArrayList<>();
private final List<DriverProfile> driverProfiles = new ArrayList<>();
private final List<PlanProfile> planProfiles = new ArrayList<>();

public void accumulate(DriverCompletionInfo info) {
this.documentsFound += info.documentsFound;
this.valuesLoaded += info.valuesLoaded;
this.collectedProfiles.addAll(info.collectedProfiles);
this.driverProfiles.addAll(info.driverProfiles);
this.planProfiles.addAll(info.planProfiles);
}

public DriverCompletionInfo finish() {
return new DriverCompletionInfo(documentsFound, valuesLoaded, collectedProfiles);
return new DriverCompletionInfo(documentsFound, valuesLoaded, driverProfiles, planProfiles);
}
}

public static class AtomicAccumulator {
private final AtomicLong documentsFound = new AtomicLong();
private final AtomicLong valuesLoaded = new AtomicLong();
private final List<DriverProfile> collectedProfiles = Collections.synchronizedList(new ArrayList<>());
private final List<PlanProfile> planProfiles = Collections.synchronizedList(new ArrayList<>());

public void accumulate(DriverCompletionInfo info) {
this.documentsFound.addAndGet(info.documentsFound);
this.valuesLoaded.addAndGet(info.valuesLoaded);
this.collectedProfiles.addAll(info.collectedProfiles);
this.collectedProfiles.addAll(info.driverProfiles);
this.planProfiles.addAll(info.planProfiles);
}

public DriverCompletionInfo finish() {
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles);
return new DriverCompletionInfo(documentsFound.get(), valuesLoaded.get(), collectedProfiles, planProfiles);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

public record PlanProfile(String description, String clusterName, String nodeName, String planTree) implements Writeable, ToXContentObject {

public static PlanProfile readFrom(StreamInput in) throws IOException {
return new PlanProfile(in.readString(), in.readString(), in.readString(), in.readString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(description);
out.writeString(clusterName);
out.writeString(nodeName);
out.writeString(planTree);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject()
.field("description", description)
.field("cluster_name", clusterName)
.field("node_name", nodeName)
.field("plan", planTree)
.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ private void testPushQuery(
result,
getResultMatcher(result).entry(
"profile",
matchesMap().entry("drivers", instanceOf(List.class))
matchesMap() //
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ private void testQuery(Double percent, String query, int documentsFound, boolean
matchesMap().entry("documents_found", documentsFound)
.entry(
"profile",
matchesMap().entry("drivers", instanceOf(List.class))
matchesMap() //
.entry("drivers", instanceOf(List.class))
.entry("plans", instanceOf(List.class))
.entry("planning", matchesMap().extraOk())
.entry("query", matchesMap().extraOk())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.PlanProfile;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -121,7 +122,7 @@ static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException {
long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0;
long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19) ? in.readVLong() : 0;
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
profile = in.readOptionalWriteable(Profile::new);
profile = in.readOptionalWriteable(Profile::readFrom);
}
boolean columnar = in.readBoolean();
EsqlExecutionInfo executionInfo = null;
Expand Down Expand Up @@ -261,6 +262,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
ob.field("planning", executionInfo.planningTimeSpan());
}
ob.array("drivers", profile.drivers.iterator(), ChunkedToXContentBuilder::append);
ob.array("plans", profile.plans.iterator());
}));
}
});
Expand Down Expand Up @@ -372,41 +374,23 @@ public EsqlResponse responseInternal() {
return esqlResponse;
}

public static class Profile implements Writeable {
private final List<DriverProfile> drivers;
public record Profile(List<DriverProfile> drivers, List<PlanProfile> plans) implements Writeable {

public Profile(List<DriverProfile> drivers) {
this.drivers = drivers;
}

public Profile(StreamInput in) throws IOException {
this.drivers = in.readCollectionAsImmutableList(DriverProfile::new);
public static Profile readFrom(StreamInput in) throws IOException {
return new Profile(
in.readCollectionAsImmutableList(DriverProfile::new),
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19)
? in.readCollectionAsImmutableList(PlanProfile::readFrom)
: List.of()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(drivers);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_INCLUDE_PLAN_8_19)) {
out.writeCollection(plans);
}
Profile profile = (Profile) o;
return Objects.equals(drivers, profile.drivers);
}

@Override
public int hashCode() {
return Objects.hash(drivers);
}

List<DriverProfile> drivers() {
return drivers;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ final class ComputeResponse extends TransportResponse {
ComputeResponse(StreamInput in) throws IOException {
super(in);
if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19)) {
completionInfo = new DriverCompletionInfo(in);
completionInfo = DriverCompletionInfo.readFrom(in);
} else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
if (in.readBoolean()) {
completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new));
completionInfo = new DriverCompletionInfo(0, 0, in.readCollectionAsImmutableList(DriverProfile::new), List.of());
} else {
completionInfo = DriverCompletionInfo.EMPTY;
}
Expand Down Expand Up @@ -96,7 +96,7 @@ public void writeTo(StreamOutput out) throws IOException {
completionInfo.writeTo(out);
} else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeBoolean(true);
out.writeCollection(completionInfo.collectedProfiles());
out.writeCollection(completionInfo.driverProfiles());
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
out.writeOptionalTimeValue(took);
Expand Down
Loading