Skip to content

Commit 21c8ab0

Browse files
samxbrjulian-elastic
authored andcommitted
Make GeoIp downloader multi-project aware (elastic#128282)
This change makes the GeoIp persistent task executor/downloader multi-project aware. - the database downloader persistent task will be at the project level, meaning there will be a downloader instance per project - persistent task id is prefixed with project id, namely `<project-id>/geoip-downloader` for cluster in MP mode
1 parent f672350 commit 21c8ab0

File tree

11 files changed

+499
-120
lines changed

11 files changed

+499
-120
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
apply plugin: 'elasticsearch.internal-java-rest-test'
11+
12+
dependencies {
13+
javaRestTestImplementation project(':modules:ingest-geoip')
14+
javaRestTestImplementation project(':test:external-modules:test-multi-project')
15+
javaRestTestImplementation project(':test:fixtures:geoip-fixture')
16+
17+
clusterModules project(':modules:ingest-geoip')
18+
clusterModules project(':modules:reindex') // needed for database cleanup
19+
clusterModules project(':test:external-modules:test-multi-project')
20+
}
21+
22+
tasks.withType(Test).configureEach {
23+
it.systemProperty "tests.multi_project.enabled", true
24+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package geoip;
11+
12+
import fixture.geoip.GeoIpHttpFixture;
13+
14+
import org.elasticsearch.client.Request;
15+
import org.elasticsearch.client.RequestOptions;
16+
import org.elasticsearch.core.Booleans;
17+
import org.elasticsearch.ingest.geoip.GeoIpDownloader;
18+
import org.elasticsearch.ingest.geoip.GeoIpDownloaderTaskExecutor;
19+
import org.elasticsearch.tasks.Task;
20+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
21+
import org.elasticsearch.test.rest.ESRestTestCase;
22+
import org.elasticsearch.test.rest.ObjectPath;
23+
import org.junit.ClassRule;
24+
import org.junit.rules.RuleChain;
25+
import org.junit.rules.TestRule;
26+
27+
import java.io.IOException;
28+
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Objects;
31+
import java.util.Set;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.stream.Collectors;
34+
35+
import static org.hamcrest.Matchers.equalTo;
36+
37+
public class GeoIpMultiProjectIT extends ESRestTestCase {
38+
// default true
39+
private static final boolean useFixture = Booleans.parseBoolean(System.getProperty("geoip_use_service", "false")) == false;
40+
41+
public static final GeoIpHttpFixture fixture = new GeoIpHttpFixture(useFixture);
42+
43+
public static final ElasticsearchCluster cluster = ElasticsearchCluster.local()
44+
.module("ingest-geoip")
45+
.module("reindex") // for database cleanup
46+
.module("test-multi-project")
47+
.setting("test.multi_project.enabled", "true")
48+
.setting(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), "true")
49+
.setting(GeoIpDownloader.ENDPOINT_SETTING.getKey(), fixture::getAddress, (k) -> useFixture)
50+
.build();
51+
52+
@ClassRule
53+
public static TestRule ruleChain = RuleChain.outerRule(fixture).around(cluster);
54+
55+
@Override
56+
protected String getTestRestCluster() {
57+
return cluster.getHttpAddresses();
58+
}
59+
60+
@Override
61+
protected boolean shouldConfigureProjects() {
62+
return false;
63+
}
64+
65+
public void testGeoIpDownloader() throws Exception {
66+
String project1 = randomUniqueProjectId().id();
67+
String project2 = randomUniqueProjectId().id();
68+
createProject(project1);
69+
createProject(project2);
70+
71+
// download databases for project1
72+
putGeoIpPipeline(project1);
73+
assertBusy(() -> assertDatabases(project1, true), 30, TimeUnit.SECONDS);
74+
assertBusy(() -> assertDatabases(project2, false), 30, TimeUnit.SECONDS);
75+
76+
// download databases for project2
77+
putGeoIpPipeline(project2);
78+
assertBusy(() -> assertDatabases(project2, true), 30, TimeUnit.SECONDS);
79+
}
80+
81+
private void putGeoIpPipeline(String projectId) throws IOException {
82+
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/geoip-pipeline");
83+
putPipelineRequest.setJsonEntity("""
84+
{
85+
"processors" : [
86+
{
87+
"geoip" : {
88+
"field" : "ip",
89+
"target_field" : "geo",
90+
"database_file" : "GeoLite2-Country.mmdb"
91+
}
92+
}
93+
]
94+
}
95+
""");
96+
setRequestProjectId(projectId, putPipelineRequest);
97+
assertOK(client().performRequest(putPipelineRequest));
98+
}
99+
100+
private static Request setRequestProjectId(String projectId, Request request) {
101+
RequestOptions.Builder options = request.getOptions().toBuilder();
102+
options.removeHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER);
103+
options.addHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER, projectId);
104+
request.setOptions(options);
105+
return request;
106+
}
107+
108+
@SuppressWarnings("unchecked")
109+
private void assertDatabases(String projectId, boolean shouldDownload) throws IOException {
110+
Request getTaskState = new Request("GET", "/_cluster/state");
111+
setRequestProjectId(projectId, getTaskState);
112+
113+
ObjectPath state = ObjectPath.createFromResponse(assertOK(client().performRequest(getTaskState)));
114+
115+
List<Map<String, ?>> tasks = state.evaluate("metadata.persistent_tasks.tasks");
116+
// Short-circuit to avoid using steams if the list is empty
117+
if (tasks.isEmpty()) {
118+
fail("persistent tasks list is empty, expected at least one task for geoip-downloader");
119+
}
120+
121+
// verify project task id
122+
Set<Map<String, ?>> id = tasks.stream()
123+
.filter(task -> String.format("%s/geoip-downloader", projectId).equals(task.get("id")))
124+
.collect(Collectors.toSet());
125+
assertThat(id.size(), equalTo(1));
126+
127+
// verify database download
128+
Map<String, Object> databases = (Map<String, Object>) tasks.stream().map(task -> {
129+
try {
130+
return ObjectPath.evaluate(task, "task.geoip-downloader.state.databases");
131+
} catch (IOException e) {
132+
return null;
133+
}
134+
}).filter(Objects::nonNull).findFirst().orElse(null);
135+
136+
if (shouldDownload) {
137+
// verify database downloaded
138+
assertNotNull(databases);
139+
for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
140+
Object database = databases.get(name);
141+
assertNotNull(database);
142+
assertNotNull(ObjectPath.evaluate(database, "md5"));
143+
}
144+
} else {
145+
// verify database not downloaded
146+
assertNull(databases);
147+
}
148+
149+
}
150+
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloader.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.support.PlainActionFuture;
1919
import org.elasticsearch.client.internal.Client;
2020
import org.elasticsearch.cluster.block.ClusterBlockLevel;
21+
import org.elasticsearch.cluster.metadata.ProjectId;
2122
import org.elasticsearch.cluster.service.ClusterService;
2223
import org.elasticsearch.common.hash.MessageDigests;
2324
import org.elasticsearch.common.settings.Setting;
@@ -95,6 +96,8 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
9596
*/
9697
private final Supplier<Boolean> atLeastOneGeoipProcessorSupplier;
9798

99+
private final ProjectId projectId;
100+
98101
GeoIpDownloader(
99102
Client client,
100103
HttpClient httpClient,
@@ -109,17 +112,19 @@ public class GeoIpDownloader extends AllocatedPersistentTask {
109112
Map<String, String> headers,
110113
Supplier<TimeValue> pollIntervalSupplier,
111114
Supplier<Boolean> eagerDownloadSupplier,
112-
Supplier<Boolean> atLeastOneGeoipProcessorSupplier
115+
Supplier<Boolean> atLeastOneGeoipProcessorSupplier,
116+
ProjectId projectId
113117
) {
114118
super(id, type, action, description, parentTask, headers);
115-
this.client = client;
119+
this.client = client.projectClient(projectId);
116120
this.httpClient = httpClient;
117121
this.clusterService = clusterService;
118122
this.threadPool = threadPool;
119123
this.endpoint = ENDPOINT_SETTING.get(settings);
120124
this.pollIntervalSupplier = pollIntervalSupplier;
121125
this.eagerDownloadSupplier = eagerDownloadSupplier;
122126
this.atLeastOneGeoipProcessorSupplier = atLeastOneGeoipProcessorSupplier;
127+
this.projectId = projectId;
123128
}
124129

125130
void setState(GeoIpTaskState state) {
@@ -134,16 +139,17 @@ void setState(GeoIpTaskState state) {
134139
// visible for testing
135140
void updateDatabases() throws IOException {
136141
var clusterState = clusterService.state();
137-
var geoipIndex = clusterState.getMetadata().getProject().getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
142+
var geoipIndex = clusterState.getMetadata().getProject(projectId).getIndicesLookup().get(GeoIpDownloader.DATABASES_INDEX);
138143
if (geoipIndex != null) {
139144
logger.trace("The {} index is not null", GeoIpDownloader.DATABASES_INDEX);
140-
if (clusterState.getRoutingTable().index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
145+
if (clusterState.routingTable(projectId).index(geoipIndex.getWriteIndex()).allPrimaryShardsActive() == false) {
141146
logger.debug(
142147
"Not updating geoip database because not all primary shards of the [" + DATABASES_INDEX + "] index are active."
143148
);
144149
return;
145150
}
146-
var blockException = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
151+
var blockException = clusterState.blocks()
152+
.indexBlockedException(projectId, ClusterBlockLevel.WRITE, geoipIndex.getWriteIndex().getName());
147153
if (blockException != null) {
148154
logger.debug(
149155
"Not updating geoip database because there is a write block on the " + geoipIndex.getWriteIndex().getName() + " index",
@@ -196,7 +202,7 @@ private void processDatabase(final String name, final String md5, final String u
196202
updateTimestamp(name, metadata);
197203
return;
198204
}
199-
logger.debug("downloading geoip database [{}]", name);
205+
logger.debug("downloading geoip database [{}] for project [{}]", name, projectId);
200206
long start = System.currentTimeMillis();
201207
try (InputStream is = httpClient.get(url)) {
202208
int firstChunk = metadata.lastChunk() + 1; // if there is no metadata, then Metadata.EMPTY.lastChunk() + 1 = 0
@@ -205,12 +211,12 @@ private void processDatabase(final String name, final String md5, final String u
205211
state = state.put(name, new Metadata(start, firstChunk, lastChunk - 1, md5, start));
206212
updateTaskState();
207213
stats = stats.successfulDownload(System.currentTimeMillis() - start).databasesCount(state.getDatabases().size());
208-
logger.info("successfully downloaded geoip database [{}]", name);
214+
logger.info("successfully downloaded geoip database [{}] for project [{}]", name, projectId);
209215
deleteOldChunks(name, firstChunk);
210216
}
211217
} catch (Exception e) {
212218
stats = stats.failedDownload();
213-
logger.error(() -> "error downloading geoip database [" + name + "]", e);
219+
logger.error(() -> "error downloading geoip database [" + name + "] for project [" + projectId + "]", e);
214220
}
215221
}
216222

@@ -230,15 +236,15 @@ void deleteOldChunks(String name, int firstChunk) {
230236

231237
// visible for testing
232238
protected void updateTimestamp(String name, Metadata old) {
233-
logger.debug("geoip database [{}] is up to date, updated timestamp", name);
239+
logger.debug("geoip database [{}] is up to date for project [{}], updated timestamp", name, projectId);
234240
state = state.put(name, new Metadata(old.lastUpdate(), old.firstChunk(), old.lastChunk(), old.md5(), System.currentTimeMillis()));
235241
stats = stats.skippedDownload();
236242
updateTaskState();
237243
}
238244

239245
void updateTaskState() {
240246
PlainActionFuture<PersistentTask<?>> future = new PlainActionFuture<>();
241-
updatePersistentTaskState(state, future);
247+
updateProjectPersistentTaskState(projectId, state, future);
242248
state = ((GeoIpTaskState) future.actionGet().getState());
243249
}
244250

@@ -360,5 +366,4 @@ private void scheduleNextRun(TimeValue time) {
360366
scheduled = threadPool.schedule(this::runDownloader, time, threadPool.generic());
361367
}
362368
}
363-
364369
}

0 commit comments

Comments
 (0)