Skip to content

ESQL: Don't build can_match queries we can't push to data nodes #130210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 1, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public String fieldName() {
return fieldName;
}

public String description() {
return description;
}

@Override
public String getWriteableName() {
throw new UnsupportedOperationException("AutomatonQueryBuilder does not support getWriteableName");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,11 +38,15 @@
import java.util.stream.Stream;

import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.ccq.Clusters.REMOTE_CLUSTER_NAME;
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
public class MultiClustersIT extends ESRestTestCase {
Expand Down Expand Up @@ -129,7 +134,7 @@ void indexDocs(RestClient client, String index, List<Doc> docs) throws IOExcepti
}

private Map<String, Object> run(String query, boolean includeCCSMetadata) throws IOException {
var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query);
var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query).profile(true);
if (includeCCSMetadata) {
queryBuilder.includeCCSMetadata(true);
}
Expand Down Expand Up @@ -158,12 +163,51 @@ private Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder reques
}
}

private <C, V> void assertResultMapForLike(
boolean includeCCSMetadata,
Map<String, Object> result,
C columns,
V values,
boolean remoteOnly,
boolean requireLikeListCapability
) throws IOException {
List<String> requiredCapabilities = new ArrayList<>(List.of("like_on_index_fields"));
if (requireLikeListCapability) {
requiredCapabilities.add("like_list_on_index_fields");
}
// the feature is completely supported if both local and remote clusters support it
boolean isSupported = clusterHasCapability("POST", "/_query", List.of(), requiredCapabilities).orElse(false);
try (RestClient remoteClient = remoteClusterClient()) {
isSupported = isSupported
&& clusterHasCapability(remoteClient, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
}

if (isSupported) {
assertResultMap(includeCCSMetadata, result, columns, values, remoteOnly);
} else {
logger.info("--> skipping data check for like index test, cluster does not support like index feature");
// just verify that we did not get a partial result
var clusters = result.get("_clusters");
var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
assertThat(reason, result.get("is_partial"), anyOf(nullValue(), is(false)));
}
}

private boolean capabilitiesSupportedNewAndOld(List<String> requiredCapabilities) throws IOException {
boolean isSupported = clusterHasCapability("POST", "/_query", List.of(), requiredCapabilities).orElse(false);
try (RestClient remoteClient = remoteClusterClient()) {
isSupported = isSupported
&& clusterHasCapability(remoteClient, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
}
return isSupported;
}

private <C, V> void assertResultMap(boolean includeCCSMetadata, Map<String, Object> result, C columns, V values, boolean remoteOnly) {
MapMatcher mapMatcher = getResultMatcher(
ccsMetadataAvailable(),
result.containsKey("is_partial"),
result.containsKey("documents_found")
);
).extraOk();
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
Expand Down Expand Up @@ -251,11 +295,13 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO

@SuppressWarnings("unchecked")
Map<String, Object> remoteClusterShards = (Map<String, Object>) remoteCluster.get("_shards");
assertThat(remoteClusterShards.keySet(), equalTo(Set.of("total", "successful", "skipped", "failed")));
assertThat((Integer) remoteClusterShards.get("total"), greaterThanOrEqualTo(0));
assertThat((Integer) remoteClusterShards.get("successful"), equalTo((Integer) remoteClusterShards.get("total")));
assertThat((Integer) remoteClusterShards.get("skipped"), equalTo(0));
assertThat((Integer) remoteClusterShards.get("failed"), equalTo(0));
assertThat(
remoteClusterShards,
matchesMap().entry("total", greaterThanOrEqualTo(0))
.entry("successful", remoteClusterShards.get("total"))
.entry("skipped", greaterThanOrEqualTo(0))
.entry("failed", 0)
);

if (remoteOnly == false) {
@SuppressWarnings("unchecked")
Expand All @@ -267,11 +313,13 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO

@SuppressWarnings("unchecked")
Map<String, Object> localClusterShards = (Map<String, Object>) localCluster.get("_shards");
assertThat(localClusterShards.keySet(), equalTo(Set.of("total", "successful", "skipped", "failed")));
assertThat((Integer) localClusterShards.get("total"), greaterThanOrEqualTo(0));
assertThat((Integer) localClusterShards.get("successful"), equalTo((Integer) localClusterShards.get("total")));
assertThat((Integer) localClusterShards.get("skipped"), equalTo(0));
assertThat((Integer) localClusterShards.get("failed"), equalTo(0));
assertThat(
localClusterShards,
matchesMap().entry("total", greaterThanOrEqualTo(0))
.entry("successful", localClusterShards.get("total"))
.entry("skipped", greaterThanOrEqualTo(0))
.entry("failed", 0)
);
}
}

Expand Down Expand Up @@ -371,6 +419,116 @@ public void testStats() throws IOException {
assertThat(clusterData, hasKey("took"));
}

public void testLikeIndex() throws Exception {

boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index LIKE "*remote*"
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
}

public void testNotLikeIndex() throws Exception {
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index NOT LIKE "*remote*"
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(localDocs.size(), localIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
}

public void testLikeListIndex() throws Exception {
List<String> requiredCapabilities = new ArrayList<>(List.of("like_list_on_index_fields"));
// the feature is completely supported if both local and remote clusters support it
if (capabilitiesSupportedNewAndOld(requiredCapabilities) == false) {
logger.info("--> skipping testNotLikeListIndex, due to missing capability");
return;
}
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index LIKE ("*remote*", "not-exist*")
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, true);
}

public void testNotLikeListIndex() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testNotLikeListIndex will fail some of the UTs, please see the latest from my PR for fix.
I just disabled this test if it does not have like_list_on_index_fields

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please copy the latest version of this file from #130019 to get all the latest fixes on the tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha

List<String> requiredCapabilities = new ArrayList<>(List.of("like_list_on_index_fields"));
// the feature is completely supported if both local and remote clusters support it
if (capabilitiesSupportedNewAndOld(requiredCapabilities) == false) {
logger.info("--> skipping testNotLikeListIndex, due to missing capability");
return;
}
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index NOT LIKE ("*remote*", "not-exist*")
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(localDocs.size(), localIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, true);
}

public void testNotLikeListKeyWord() throws Exception {
List<String> requiredCapabilities = new ArrayList<>(List.of("like_list_on_index_fields"));
// the feature is completely supported if both local and remote clusters support it
if (capabilitiesSupportedNewAndOld(requiredCapabilities) == false) {
logger.info("--> skipping testNotLikeListIndex, due to missing capability");
return;
}
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE color NOT LIKE ("*blue*", "*red*")
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(localDocs.size(), localIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, true);
}

public void testRLikeIndex() throws Exception {
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index RLIKE ".*remote.*"
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(remoteDocs.size(), REMOTE_CLUSTER_NAME + ":" + remoteIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
}

public void testNotRLikeIndex() throws Exception {
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("""
FROM test-local-index,*:test-remote-index METADATA _index
| WHERE _index NOT RLIKE ".*remote.*"
| STATS c = COUNT(*) BY _index
| SORT _index ASC
""", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"), Map.of("name", "_index", "type", "keyword"));
var values = List.of(List.of(localDocs.size(), localIndex));
assertResultMapForLike(includeCCSMetadata, result, columns, values, false, false);
}

private RestClient remoteClusterClient() throws IOException {
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,12 @@ protected WildcardLikeList replaceChild(Expression newLeft) {
*/
@Override
public Translatable translatable(LucenePushdownPredicates pushdownPredicates) {
return pushdownPredicates.isPushableAttribute(field()) ? Translatable.YES : Translatable.NO;

if (pushdownPredicates.minTransportVersion() == null) {
return pushdownPredicates.isPushableAttribute(field()) ? Translatable.YES : Translatable.NO;
} else {
// The AutomatonQuery that we use right now isn't serializable.
return Translatable.NO;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
Expand All @@ -30,6 +33,24 @@
* </ol>
*/
public interface LucenePushdownPredicates {
/**
* If we're extracting a query for {@code can_match} then this is the
* minimum transport version in the cluster. Otherwise, this is {@code null}.
* <p>
* If this is not null {@link Expression}s should not claim to be
* serializable unless their {@link QueryBuilder}
* {@link QueryBuilder#supportsVersion supports} the version.
* </p>
* <p>
* This is done on the coordinating node <strong>and</strong>. And for
* cross cluster search this is done on the coordinating node on the
* remote cluster. So! We actually <strong>have</strong> the minimum
* cluster transport version.
* </p>
*/
@Nullable
TransportVersion minTransportVersion();

/**
* For TEXT fields, we need to check if the field has a subfield of type KEYWORD that can be used instead.
*/
Expand Down Expand Up @@ -101,29 +122,41 @@ static String pushableAttributeName(TypedAttribute attribute) {
* In particular, it assumes TEXT fields have no exact subfields (underlying keyword field),
* and that isAggregatable means indexed and has hasDocValues.
*/
LucenePushdownPredicates DEFAULT = new LucenePushdownPredicates() {
@Override
public boolean hasExactSubfield(FieldAttribute attr) {
return false;
}
LucenePushdownPredicates DEFAULT = forCanMatch(null);

@Override
public boolean isIndexedAndHasDocValues(FieldAttribute attr) {
// Is the FieldType.isAggregatable() check correct here? In FieldType isAggregatable usually only means hasDocValues
return attr.field().isAggregatable();
}
/**
* A {@link LucenePushdownPredicates} for use with the {@code can_match} phase.
*/
static LucenePushdownPredicates forCanMatch(TransportVersion minTransportVersion) {
return new LucenePushdownPredicates() {
@Override
public TransportVersion minTransportVersion() {
return minTransportVersion;
}

@Override
public boolean isIndexed(FieldAttribute attr) {
// TODO: This is the original behaviour, but is it correct? In FieldType isAggregatable usually only means hasDocValues
return attr.field().isAggregatable();
}
@Override
public boolean hasExactSubfield(FieldAttribute attr) {
return false;
}

@Override
public boolean canUseEqualityOnSyntheticSourceDelegate(FieldAttribute attr, String value) {
return false;
}
};
@Override
public boolean isIndexedAndHasDocValues(FieldAttribute attr) {
// Is the FieldType.isAggregatable() check correct here? In FieldType isAggregatable usually only means hasDocValues
return attr.field().isAggregatable();
}

@Override
public boolean isIndexed(FieldAttribute attr) {
// TODO: This is the original behaviour, but is it correct? In FieldType isAggregatable usually only means hasDocValues
return attr.field().isAggregatable();
}

@Override
public boolean canUseEqualityOnSyntheticSourceDelegate(FieldAttribute attr, String value) {
return false;
}
};
}

/**
* If we have access to {@link SearchStats} over a collection of shards, we can make more fine-grained decisions about what can be
Expand All @@ -133,6 +166,11 @@ static LucenePushdownPredicates from(SearchStats stats) {
// TODO: use FieldAttribute#fieldName, otherwise this doesn't apply to field attributes used for union types.
// C.f. https://github.com/elastic/elasticsearch/issues/128905
return new LucenePushdownPredicates() {
@Override
public TransportVersion minTransportVersion() {
return null;
}

@Override
public boolean hasExactSubfield(FieldAttribute attr) {
return stats.hasExactSubfield(new FieldAttribute.FieldName(attr.name()));
Expand Down
Loading