From e4e46259ff48f6e243b79d04e4010ad238b2184a Mon Sep 17 00:00:00 2001 From: Tamas Szasz Date: Tue, 15 Jan 2019 22:16:36 +0200 Subject: [PATCH 1/6] Make kafka client unique across multiple pipelines --- lib/logstash/inputs/kafka.rb | 4 +++- logstash-input-kafka.iml | 9 +++++++++ spec/integration/inputs/kafka_spec.rb | 4 ++-- spec/unit/inputs/kafka_spec.rb | 3 ++- 4 files changed, 16 insertions(+), 4 deletions(-) create mode 100644 logstash-input-kafka.iml diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 4ce411e..6151f95 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -131,6 +131,8 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # The class name of the partition assignment strategy that the client will use to distribute # partition ownership amongst consumer instances config :partition_assignment_strategy, :validate => :string + # ID of the pipeline whose events you want to read from. + config :pipeline_id, :validate => :string, :default => "main" # The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. config :receive_buffer_bytes, :validate => :string # The amount of time to wait before attempting to reconnect to a given host. @@ -221,7 +223,7 @@ def register public def run(logstash_queue) - @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } + @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}-#{pipeline_id}") } @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } @runner_threads.each { |t| t.join } end # def run diff --git a/logstash-input-kafka.iml b/logstash-input-kafka.iml new file mode 100644 index 0000000..8be92d4 --- /dev/null +++ b/logstash-input-kafka.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 1fa3015..4f178e6 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -13,7 +13,7 @@ let(:group_id_4) {rand(36**8).to_s(36)} let(:group_id_5) {rand(36**8).to_s(36)} let(:plain_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) } + let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3, "pipeline_id" => "spec_pipeline"}) } let(:snappy_config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } let(:lz4_config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } let(:pattern_config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } @@ -82,7 +82,7 @@ def thread_it(kafka_input, queue) wait(timeout_seconds).for {queue.length}.to eq(num_events) expect(queue.length).to eq(num_events) kafka_input.kafka_consumers.each_with_index do |consumer, i| - expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}") + expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-spec_pipeline") end ensure t.kill diff --git a/spec/unit/inputs/kafka_spec.rb b/spec/unit/inputs/kafka_spec.rb index ed610f5..8dd3735 100644 --- a/spec/unit/inputs/kafka_spec.rb +++ b/spec/unit/inputs/kafka_spec.rb @@ -30,7 +30,8 @@ def wakeup end describe LogStash::Inputs::Kafka do - let(:config) { { 'topics' => ['logstash'], 'consumer_threads' => 4 } } + let(:pipeline_id) { SecureRandom.hex(8)} + let(:config) { { 'topics' => ['logstash'], 'consumer_threads' => 4, 'pipeline_id' => pipeline_id } } subject { LogStash::Inputs::Kafka.new(config) } it "should register" do From 95a745c9a1e3fac8c027d744a16bcd26ddf167e8 Mon Sep 17 00:00:00 2001 From: Tamas Szasz Date: Tue, 15 Jan 2019 22:33:00 +0200 Subject: [PATCH 2/6] Remove idea file --- logstash-input-kafka.iml | 9 --------- 1 file changed, 9 deletions(-) delete mode 100644 logstash-input-kafka.iml diff --git a/logstash-input-kafka.iml b/logstash-input-kafka.iml deleted file mode 100644 index 8be92d4..0000000 --- a/logstash-input-kafka.iml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - - \ No newline at end of file From 5fd3105ea3c4330b38775dd5171af45dccb2222c Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Tue, 19 Feb 2019 11:03:43 +0200 Subject: [PATCH 3/6] Update lib/logstash/inputs/kafka.rb as suggested by @robbavey Co-Authored-By: w32-blaster --- lib/logstash/inputs/kafka.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 6151f95..049e6aa 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -132,7 +132,10 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # partition ownership amongst consumer instances config :partition_assignment_strategy, :validate => :string # ID of the pipeline whose events you want to read from. - config :pipeline_id, :validate => :string, :default => "main" + def pipeline_id + respond_to?(:execution_context) ? execution_context.pipeline_id : "main" + end + # The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. config :receive_buffer_bytes, :validate => :string # The amount of time to wait before attempting to reconnect to a given host. From b7347f43bc2a35dbb3f5aa2be29d9d2e6bed15d0 Mon Sep 17 00:00:00 2001 From: Tamas Szasz Date: Thu, 21 Feb 2019 19:37:29 +0200 Subject: [PATCH 4/6] use the context to retrieve the pipeline_id --- lib/logstash/inputs/kafka.rb | 1 - spec/unit/inputs/kafka_spec.rb | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 049e6aa..e65202b 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -135,7 +135,6 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base def pipeline_id respond_to?(:execution_context) ? execution_context.pipeline_id : "main" end - # The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. config :receive_buffer_bytes, :validate => :string # The amount of time to wait before attempting to reconnect to a given host. diff --git a/spec/unit/inputs/kafka_spec.rb b/spec/unit/inputs/kafka_spec.rb index 8dd3735..ed610f5 100644 --- a/spec/unit/inputs/kafka_spec.rb +++ b/spec/unit/inputs/kafka_spec.rb @@ -30,8 +30,7 @@ def wakeup end describe LogStash::Inputs::Kafka do - let(:pipeline_id) { SecureRandom.hex(8)} - let(:config) { { 'topics' => ['logstash'], 'consumer_threads' => 4, 'pipeline_id' => pipeline_id } } + let(:config) { { 'topics' => ['logstash'], 'consumer_threads' => 4 } } subject { LogStash::Inputs::Kafka.new(config) } it "should register" do From 7d79adc9b744d1f55fd6d2fcca28879ca2ae57c5 Mon Sep 17 00:00:00 2001 From: Tamas Szasz Date: Thu, 21 Feb 2019 20:04:27 +0200 Subject: [PATCH 5/6] modify integration spec --- spec/integration/inputs/kafka_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 4f178e6..4593152 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -13,7 +13,7 @@ let(:group_id_4) {rand(36**8).to_s(36)} let(:group_id_5) {rand(36**8).to_s(36)} let(:plain_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3, "pipeline_id" => "spec_pipeline"}) } + let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) } let(:snappy_config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } let(:lz4_config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } let(:pattern_config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } @@ -82,7 +82,7 @@ def thread_it(kafka_input, queue) wait(timeout_seconds).for {queue.length}.to eq(num_events) expect(queue.length).to eq(num_events) kafka_input.kafka_consumers.each_with_index do |consumer, i| - expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-spec_pipeline") + expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-main") end ensure t.kill From a008e3cb3ebc6533f54bcf4863296f00e44000c0 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Tue, 26 Feb 2019 11:05:18 -0500 Subject: [PATCH 6/6] Clean up integration tests --- lib/logstash/inputs/kafka.rb | 10 +- spec/integration/inputs/kafka_spec.rb | 201 +++++++++----------------- 2 files changed, 75 insertions(+), 136 deletions(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index e65202b..1ba6fbc 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -131,10 +131,6 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # The class name of the partition assignment strategy that the client will use to distribute # partition ownership amongst consumer instances config :partition_assignment_strategy, :validate => :string - # ID of the pipeline whose events you want to read from. - def pipeline_id - respond_to?(:execution_context) ? execution_context.pipeline_id : "main" - end # The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. config :receive_buffer_bytes, :validate => :string # The amount of time to wait before attempting to reconnect to a given host. @@ -360,4 +356,10 @@ def set_sasl_config(props) props.put("sasl.kerberos.service.name",sasl_kerberos_service_name) unless sasl_kerberos_service_name.nil? end + + # ID of the pipeline whose events you want to read from. + def pipeline_id + respond_to?(:execution_context) ? execution_context.pipeline_id : "main" + end + end #class LogStash::Inputs::Kafka diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 4593152..91f5572 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -4,166 +4,103 @@ require "digest" require "rspec/wait" +def thread_it(kafka_input, queue) + Thread.new do + begin + kafka_input.run(queue) + end + end +end + +def run_with_kafka(&block) + queue = Queue.new + t = thread_it(kafka_input, queue) + begin + wait(timeout_seconds).for {queue.length}.to eq(expected_num_events) + yield(queue) + ensure + t.kill + t.join(30_000) + end +end + +shared_examples 'consumes all expected messages' do + it 'should consume all expected messages' do + run_with_kafka do |queue| + expect(queue.length).to eq(expected_num_events) + end + end +end + # Please run kafka_test_setup.sh prior to executing this integration test. describe "inputs/kafka", :integration => true do + subject(:kafka_input) { LogStash::Inputs::Kafka.new(config) } + let(:execution_context) { double("execution_context")} + + before :each do + allow(kafka_input).to receive(:execution_context).and_return(execution_context) + allow(execution_context).to receive(:pipeline_id).and_return(pipeline_id) + end + # Group ids to make sure that the consumers get all the logs. let(:group_id_1) {rand(36**8).to_s(36)} let(:group_id_2) {rand(36**8).to_s(36)} let(:group_id_3) {rand(36**8).to_s(36)} let(:group_id_4) {rand(36**8).to_s(36)} - let(:group_id_5) {rand(36**8).to_s(36)} - let(:plain_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:multi_consumer_config) { plain_config.merge({"group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) } - let(:snappy_config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:lz4_config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } - let(:pattern_config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_2, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } - let(:decorate_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} } - let(:manual_commit_config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_5, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} } + let(:pipeline_id) {rand(36**8).to_s(36)} + let(:config) { { 'codec' => 'plain', 'auto_offset_reset' => 'earliest'}} let(:timeout_seconds) { 30 } let(:num_events) { 103 } + let(:expected_num_events) { num_events } - describe "#kafka-topics" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end - - it "should consume all messages from plain 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(plain_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end - end - - it "should consume all messages from snappy 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(snappy_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end - end + context 'from a plain 3 partition topic' do + let(:config) { super.merge({ 'topics' => ['logstash_topic_plain'], 'group_id' => group_id_1}) } + it_behaves_like 'consumes all expected messages' + end - it "should consume all messages from lz4 3-partition topic" do - kafka_input = LogStash::Inputs::Kafka.new(lz4_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) - end - end + context 'from snappy 3 partition topic' do + let(:config) { { 'topics' => ['logstash_topic_snappy'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } + it_behaves_like 'consumes all expected messages' + end - it "should consumer all messages with multiple consumers" do - kafka_input = LogStash::Inputs::Kafka.new(multi_consumer_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - kafka_input.kafka_consumers.each_with_index do |consumer, i| - expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-main") - end - ensure - t.kill - t.join(30_000) - end - end + context 'from lz4 3 partition topic' do + let(:config) { { 'topics' => ['logstash_topic_lz4'], 'codec' => 'plain', 'group_id' => group_id_1, 'auto_offset_reset' => 'earliest'} } + it_behaves_like 'consumes all expected messages' end - describe "#kafka-topics-pattern" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end + context 'manually committing' do + let(:config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_2, 'auto_offset_reset' => 'earliest', 'enable_auto_commit' => 'false'} } + it_behaves_like 'consumes all expected messages' + end - it "should consume all messages from all 3 topics" do - kafka_input = LogStash::Inputs::Kafka.new(pattern_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(3*num_events) - expect(queue.length).to eq(3*num_events) - ensure - t.kill - t.join(30_000) - end - end + context 'using a pattern to consume from all 3 topics' do + let(:config) { { 'topics_pattern' => 'logstash_topic_.*', 'group_id' => group_id_3, 'codec' => 'plain', 'auto_offset_reset' => 'earliest'} } + let(:expected_num_events) { 3*num_events } + it_behaves_like 'consumes all expected messages' end - describe "#kafka-decorate" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) + context "with multiple consumers" do + let(:config) { super.merge({'topics' => ['logstash_topic_plain'], "group_id" => group_id_4, "client_id" => "spec", "consumer_threads" => 3}) } + it 'should should consume all messages' do + run_with_kafka do |queue| + expect(queue.length).to eq(num_events) + kafka_input.kafka_consumers.each_with_index do |consumer, i| + expect(consumer.metrics.keys.first.tags["client-id"]).to eq("spec-#{i}-#{pipeline_id}") end end end + end + context 'with decorate events set to true' do + let(:config) { { 'topics' => ['logstash_topic_plain'], 'codec' => 'plain', 'group_id' => group_id_3, 'auto_offset_reset' => 'earliest', 'decorate_events' => true} } it "should show the right topic and group name in decorated kafka section" do start = LogStash::Timestamp.now.time.to_i - kafka_input = LogStash::Inputs::Kafka.new(decorate_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) + run_with_kafka do |queue| expect(queue.length).to eq(num_events) event = queue.shift expect(event.get("[@metadata][kafka][topic]")).to eq("logstash_topic_plain") expect(event.get("[@metadata][kafka][consumer_group]")).to eq(group_id_3) expect(event.get("[@metadata][kafka][timestamp]")).to be >= start - ensure - t.kill - t.join(30_000) - end - end - end - - describe "#kafka-offset-commit" do - def thread_it(kafka_input, queue) - Thread.new do - begin - kafka_input.run(queue) - end - end - end - - it "should manually commit offsets" do - kafka_input = LogStash::Inputs::Kafka.new(manual_commit_config) - queue = Queue.new - t = thread_it(kafka_input, queue) - begin - t.run - wait(timeout_seconds).for {queue.length}.to eq(num_events) - expect(queue.length).to eq(num_events) - ensure - t.kill - t.join(30_000) end end end