Skip to content

Commit ab15b90

Browse files
Merge pull request #19 from navdeepsekhon/18-preserve-partitions
#18 option to preserve partition count
2 parents 02c44b0 + 2d7ced6 commit ab15b90

File tree

6 files changed

+52
-3
lines changed

6 files changed

+52
-3
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
- [Delete created topics (--wipe)](#delete-created-topics)
1515
- [Delete schemas from schema registry (--wipe-schemas)](#delete-schemas)
1616
- [Debug (--debug)](#debug)
17+
- [Preserve Partition Count (--preserve-partition-count)](#preserve-partition-count)
1718
- [Contributions](#contributions)
1819

1920

@@ -122,6 +123,8 @@ If the partitions listed in config are less than the existing - an exception wil
122123

123124
If they are same - nothing.
124125

126+
If flag `--preserve-partition-count` is used, partitions will not be updated.
127+
125128
### All other configs:
126129
All other configs will be updated to the new values from config.
127130

@@ -216,6 +219,10 @@ kafkaer.schema.registry.ssl.truststore.location=...
216219

217220
Use flag `--debug` for detailed logging
218221

222+
# Preserve Partition Count
223+
224+
If a topic already exists and it's partition count is different from what is defined in the config, kafkaer will try update the partitions as described above. In order to ignore the partition count and keep the existing partitions, `--preserve-partition-count` flag can be used. When used, the difference is partition count will only be logged.
225+
219226
# Contributions
220227
Merge requests welcome. Please create an issue with change details and link it to your merge request.
221228

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ apply plugin: 'maven'
66
apply plugin: 'signing'
77

88
group 'co.navdeep'
9-
version '1.4.1'
9+
version '1.4.2'
1010
archivesBaseName = "kafkaer"
1111

1212
sourceCompatibility = 1.8

src/main/java/co/navdeep/kafkaer/App.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ public static void main(String[] a) throws Exception {
2828
throw new RuntimeException("Missing required arguments - propertiesLocation, configLocation");
2929
}
3030

31-
logger.debug("Input args: config: [{}] properties: [{}] wipe:[{}] confirm-delete: [{}], wipe-schema: [{}]", args.getConfig(), args.getProperties(), args.isWipe(), args.isConfirmDelete(), args.isWipeSchemas());
31+
logger.debug("Input args: config: [{}] properties: [{}] wipe:[{}] confirm-delete: [{}], wipe-schema: [{}], preserve-partition-count: [{}]", args.getConfig(), args.getProperties(), args.isWipe(), args.isConfirmDelete(), args.isWipeSchemas(), args.isPreservePartitionCount());
3232
Configurator configurator = new Configurator(args.getProperties(), args.getConfig());
33+
configurator.setPreservePartitionCount(args.isPreservePartitionCount());
3334
if(args.isWipe())
3435
configurator.wipeTopics(args.isConfirmDelete(), args.isWipeSchemas());
3536
else

src/main/java/co/navdeep/kafkaer/Args.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ public class Args {
2727
@Option(name="--confirm-delete", usage="Used with --wipe. Will wait for all brokers to sync up to ensure topic is deleted from all. Default max wait 60s. Configure using " + Utils.MAX_DELETE_CONFIRM_WAIT_CONFIG, handler = BooleanOptionHandler.class)
2828
boolean confirmDelete;
2929

30+
@Option(name="--preserve-partition-count", usage="If a topic already exists and it's partition count is different from config, the partition count will not be changed.", handler = BooleanOptionHandler.class)
31+
boolean preservePartitionCount;
32+
3033
@Option(name="--help", aliases= "-h", help = true, usage="list usage", handler = BooleanOptionHandler.class)
3134
boolean help;
3235

src/main/java/co/navdeep/kafkaer/Configurator.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class Configurator {
2929
private Config config;
3030
private AdminClient adminClient;
3131
private SchemaRegistryClient schemaRegistryClient;
32+
private boolean preservePartitionCount;
3233

3334
private static Logger logger = LoggerFactory.getLogger(Configurator.class);
3435

@@ -194,7 +195,21 @@ private void handleTopicConfigUpdate(Topic topic) throws InterruptedException {
194195
throw new RuntimeException(e);
195196
}
196197
}
198+
197199
private void handleTopicPartitionsUpdate(TopicDescription current, Topic topic) throws InterruptedException {
200+
if(preservePartitionCount) logPartitionDiff(current, topic);
201+
else updatePartitions(current, topic);
202+
}
203+
204+
private void logPartitionDiff(TopicDescription current, Topic topic){
205+
if(current.partitions().size() < topic.getPartitions()){
206+
logger.warn("Current partition count for topic {} is [{}], partition count in config is [{}]. Execute without --preserve-partition-count to make this partition update.", topic.getName(), current.partitions().size(), topic.getPartitions());
207+
208+
} else if(current.partitions().size() > topic.getPartitions()){
209+
logger.warn("Current partition count for topic {} is [{}], partition count in config is [{}].", topic.getName(), current.partitions().size(), topic.getPartitions());
210+
}
211+
}
212+
private void updatePartitions(TopicDescription current, Topic topic){
198213
try {
199214
if(current.partitions().size() < topic.getPartitions()){
200215
logger.debug("Updating partition count for topic {} from [{}] to [{}]", topic.getName(), current.partitions().size(), topic.getPartitions());
@@ -203,7 +218,7 @@ private void handleTopicPartitionsUpdate(TopicDescription current, Topic topic)
203218
} else if(current.partitions().size() > topic.getPartitions()){
204219
throw new RuntimeException("Can not reduce number of partitions for topic [" + topic.getName() + "] from current:" + current.partitions().size() + " to " + topic.getPartitions());
205220
}
206-
} catch(ExecutionException e){
221+
} catch(ExecutionException | InterruptedException e){
207222
throw new RuntimeException(e);
208223
}
209224
}

src/test/java/ConfiguratorTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,29 @@ public void testIncreasePartitions() throws ExecutionException, InterruptedExcep
140140
compareWithKafkaTopic(topic);
141141
}
142142

143+
@Test
144+
public void testPreservePartitions() throws ExecutionException, InterruptedException, ConfigurationException {
145+
Config config = new Config();
146+
String topicName = UUID.randomUUID().toString();
147+
Topic topic = new Topic(topicName, 1, (short)1);
148+
config.getTopics().add(topic);
149+
150+
Configurator configurator = new Configurator(Utils.readProperties(PROPERTIES_LOCATION), config);
151+
configurator.setPreservePartitionCount(true);
152+
configurator.applyConfig();
153+
154+
sleep();
155+
compareWithKafkaTopic(topic);
156+
157+
//Increase the partitions and apply config
158+
topic.setPartitions(2);
159+
configurator.applyConfig();
160+
161+
//Still expect 1 partition
162+
topic.setPartitions(1);
163+
compareWithKafkaTopic(topic);
164+
}
165+
143166
@Test
144167
public void testUpdateExistingTopicConfig() throws ConfigurationException, ExecutionException, InterruptedException {
145168
Config config = new Config();

0 commit comments

Comments
 (0)