Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ static final class ConsoleProducerOptions extends CommandDefaultOptions {
private final OptionSpec<String> propertyOpt;
private final OptionSpec<String> readerConfigOpt;
private final OptionSpec<String> producerPropertyOpt;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add deprecate tag like @deprecated(since = "4.2", forRemoval = true)

private OptionSpec<String> commandPropertyOpt;
private final OptionSpec<String> producerConfigOpt;
private OptionSpec<String> commandConfigOpt;

public ConsoleProducerOptions(String[] args) {
super(args);
Expand Down Expand Up @@ -250,11 +252,20 @@ public ConsoleProducerOptions(String[] args) {
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
producerPropertyOpt = parser.accepts("producer-property", "A mechanism to pass user-defined properties in the form key=value to the producer. ")
producerPropertyOpt = parser.accepts("producer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the producer." +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space

Suggested change
producerPropertyOpt = parser.accepts("producer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the producer." +
producerPropertyOpt = parser.accepts("producer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the producer. " +

"This option will be removed in a future version. Use --command-property instead.")
.withRequiredArg()
.describedAs("producer_prop")
.ofType(String.class);
producerConfigOpt = parser.accepts("producer.config", "Producer config properties file. Note that " + producerPropertyOpt + " takes precedence over this config.")
commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the producer.")
.withRequiredArg()
.describedAs("producer_prop")
.ofType(String.class);
producerConfigOpt = parser.accepts("producer.config", "(DEPRECATED) Producer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config. This option will be removed in a future version. Use --command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
commandConfigOpt = parser.accepts("command-config", "Producer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
Expand All @@ -273,6 +284,23 @@ void checkArgs() {

CommandLineUtils.checkRequiredArgs(parser, options, topicOpt);

if (options.has(commandConfigOpt) && options.has(producerConfigOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --command-config and --producer.config cannot be specified together.");
}
if (options.has(commandPropertyOpt) && options.has(producerPropertyOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --command-property and --producer-property cannot be specified together.");
}

if (options.has(producerPropertyOpt)) {
System.out.println("Warning: --producer-property is deprecated and will be removed in a future version. Use --command-property instead.");
commandPropertyOpt = producerPropertyOpt;
}

if (options.has(producerConfigOpt)) {
System.out.println("Warning: --producer.config is deprecated and will be removed in a future version. Use --command-config instead.");
commandConfigOpt = producerConfigOpt;
}

try {
ToolsUtils.validateBootstrapServer(options.valueOf(bootstrapServerOpt));
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -314,11 +342,11 @@ Map<String, String> readerProps() throws IOException {
Properties producerProps() throws IOException {
Properties props = new Properties();

if (options.has(producerConfigOpt)) {
props.putAll(loadProps(options.valueOf(producerConfigOpt)));
if (options.has(commandConfigOpt)) {
props.putAll(loadProps(options.valueOf(commandConfigOpt)));
}

props.putAll(parseKeyValueArgs(options.valuesOf(producerPropertyOpt)));
props.putAll(parseKeyValueArgs(options.valuesOf(commandPropertyOpt)));
props.put(BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServerOpt));
props.put(COMPRESSION_TYPE_CONFIG, compressionCodec());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,21 @@ public ConsoleConsumerOptions(String[] args) throws IOException {
.describedAs("consume offset")
.ofType(String.class)
.defaultsTo("latest");
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the consumer." +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Deprecated tag

"This option will be removed in a future version. Use --command-property instead.")
.withRequiredArg()
.describedAs("consumer_prop")
.ofType(String.class);
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.")
OptionSpec<String> commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
.withRequiredArg()
.describedAs("consumer_prop")
.ofType(String.class);
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config." +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space.

"This option will be removed in a future version. Use --command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
OptionSpec<String> commandConfigOpt = parser.accepts("command-config", "Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
Expand Down Expand Up @@ -170,11 +180,25 @@ public ConsoleConsumerOptions(String[] args) throws IOException {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to read data from Kafka topics and outputs it to standard output.");

checkRequiredArgs();
if (options.has(consumerPropertyOpt) && options.has(commandPropertyOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --consumer-property and --command-property cannot be specified together.");
}
if (options.has(consumerConfigOpt) && options.has(commandConfigOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --consumer.config and --command-config cannot be specified together.");
}

Properties consumerPropsFromFile = options.has(consumerConfigOpt)
? Utils.loadProps(options.valueOf(consumerConfigOpt))
if (options.has(consumerPropertyOpt)) {
System.out.println("Option --consumer-property is deprecated and will be removed in a future version. Use --command-property instead.");
commandPropertyOpt = consumerPropertyOpt;
}
if (options.has(consumerConfigOpt)) {
System.out.println("Option --consumer.config is deprecated and will be removed in a future version. Use --command-config instead.");
commandConfigOpt = consumerConfigOpt;
}
Properties consumerPropsFromFile = options.has(commandConfigOpt)
? Utils.loadProps(options.valueOf(commandConfigOpt))
: new Properties();
Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt));
Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(commandPropertyOpt));
Set<String> groupIdsProvided = checkConsumerGroup(consumerPropsFromFile, extraConsumerProps);
consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided);
offset = parseOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,21 @@ public ConsoleShareConsumerOptions(String[] args) throws IOException {
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
OptionSpec<String> consumerPropertyOpt = parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass user-defined properties in the form key=value to the consumer. " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Deprecated tag

"This option will be removed in a future version. Use --command-property instead.")
.withRequiredArg()
.describedAs("consumer_prop")
.ofType(String.class);
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer-config", "Consumer config properties file. Note that " + consumerPropertyOpt + " takes precedence over this config.")
OptionSpec<String> commandPropertyOpt = parser.accepts("command-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
.withRequiredArg()
.describedAs("consumer_prop")
.ofType(String.class);
OptionSpec<String> consumerConfigOpt = parser.accepts("consumer-config", "(DEPRECATED) Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config. " +
"This option will be removed in a future version. Use --command-config instead.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
OptionSpec<String> commandConfigOpt = parser.accepts("command-config", "Consumer config properties file. Note that " + commandPropertyOpt + " takes precedence over this config.")
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
Expand Down Expand Up @@ -141,10 +151,26 @@ public ConsoleShareConsumerOptions(String[] args) throws IOException {
CommandLineUtils.printUsageAndExit(parser, "At most one of --reject and --release may be specified.");
}

Properties consumerPropsFromFile = options.has(consumerConfigOpt)
? Utils.loadProps(options.valueOf(consumerConfigOpt))
if (options.has(consumerPropertyOpt) && options.has(commandPropertyOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --consumer-property and --command-property cannot be specified together.");
}
if (options.has(consumerConfigOpt) && options.has(commandConfigOpt)) {
CommandLineUtils.printUsageAndExit(parser, "Options --consumer-config and --command-config cannot be specified together.");
}

if (options.has(consumerPropertyOpt)) {
System.out.println("Option --consumer-property is deprecated and will be removed in a future version. Use --command-property instead.");
commandPropertyOpt = consumerPropertyOpt;
}
if (options.has(consumerConfigOpt)) {
System.out.println("Option --consumer-config is deprecated and will be removed in a future version. Use --command-config instead.");
commandConfigOpt = consumerConfigOpt;
}

Properties consumerPropsFromFile = options.has(commandConfigOpt)
? Utils.loadProps(options.valueOf(commandConfigOpt))
: new Properties();
Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt));
Properties extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(commandPropertyOpt));

Set<String> groupIdsProvided = checkShareGroup(consumerPropsFromFile, extraConsumerProps);
consumerProps = buildConsumerProps(consumerPropsFromFile, extraConsumerProps, groupIdsProvided);
Expand Down
113 changes: 110 additions & 3 deletions tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -57,11 +58,16 @@ public class ConsoleProducerTest {
"--bootstrap-server", "localhost:1002",
"--topic", "t3",
};
private static final String[] CLIENT_ID_OVERRIDE = new String[]{
private static final String[] CLIENT_ID_OVERRIDE_DEPRECATED = new String[]{
"--bootstrap-server", "localhost:1001",
"--topic", "t3",
"--producer-property", "client.id=producer-1"
};
private static final String[] CLIENT_ID_OVERRIDE = new String[]{
"--bootstrap-server", "localhost:1001",
"--topic", "t3",
"--command-property", "client.id=producer-1"
};
private static final String[] BATCH_SIZE_OVERRIDDEN_BY_MAX_PARTITION_MEMORY_BYTES_VALUE = new String[]{
"--bootstrap-server", "localhost:1002",
"--topic", "t3",
Expand Down Expand Up @@ -151,8 +157,8 @@ public void testBootstrapServerOverride() throws IOException {
}

@Test
public void testClientIdOverride() throws IOException {
ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE);
public void testClientIdOverrideDeprecated() throws IOException {
ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE_DEPRECATED);
ProducerConfig producerConfig = new ProducerConfig(opts.producerProps());

assertEquals("producer-1", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
Expand Down Expand Up @@ -222,6 +228,107 @@ public void testLoopReader() throws Exception {
assertEquals(1, reader.closeCount());
}

@Test
public void shouldExitOnBothProducerPropertyAndCommandProperty() {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});

String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--producer-property", "acks=all",
"--command-property", "batch.size=16384"
};

try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleProducerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}

@Test
public void shouldExitOnBothProducerConfigAndCommandConfig() throws IOException {
Exit.setExitProcedure((code, message) -> {
throw new IllegalArgumentException(message);
});

Map<String, String> configs = new HashMap<>();
configs.put("acks", "all");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);

Map<String, String> configs2 = new HashMap<>();
configs2.put("batch.size", "16384");
File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);

String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--producer.config", propsFile.getAbsolutePath(),
"--command-config", propsFile2.getAbsolutePath()
};

try {
assertThrows(IllegalArgumentException.class, () -> new ConsoleProducerOptions(args));
} finally {
Exit.resetExitProcedure();
}
}

@Test
public void testClientIdOverrideUsingCommandProperty() throws IOException {
ConsoleProducerOptions opts = new ConsoleProducerOptions(CLIENT_ID_OVERRIDE);
ProducerConfig producerConfig = new ProducerConfig(opts.producerProps());

assertEquals("producer-1", producerConfig.getString(ProducerConfig.CLIENT_ID_CONFIG));
}

@Test
public void testProducerConfigFromFileUsingCommandConfig() throws IOException {
Map<String, String> configs = new HashMap<>();
configs.put("acks", "all");
configs.put("batch.size", "32768");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);

String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-config", propsFile.getAbsolutePath()
};

ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
ProducerConfig producerConfig = new ProducerConfig(opts.producerProps());

// "all" gets converted to "-1" internally by ProducerConfig
assertEquals("-1", producerConfig.getString(ProducerConfig.ACKS_CONFIG));
assertEquals(32768, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
}

@Test
public void testCommandPropertyOverridesConfig() throws IOException {
Map<String, String> configs = new HashMap<>();
configs.put("acks", "1");
configs.put("batch.size", "16384");
File propsFile = ToolsTestUtils.tempPropertiesFile(configs);

String[] args = new String[]{
"--bootstrap-server", "localhost:9092",
"--topic", "test",
"--command-config", propsFile.getAbsolutePath(),
"--command-property", "acks=all"
};

ConsoleProducerOptions opts = new ConsoleProducerOptions(args);
ProducerConfig producerConfig = new ProducerConfig(opts.producerProps());

// Command property should override the config file value
// "all" gets converted to "-1" internally by ProducerConfig
assertEquals("-1", producerConfig.getString(ProducerConfig.ACKS_CONFIG));
// Config file value should still be present
assertEquals(16384, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG));
}

public static class TestRecordReader implements RecordReader {
private int configureCount = 0;
private int closeCount = 0;
Expand Down
Loading