diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 9f3f32d..4cd2b7c 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -221,8 +221,12 @@ def register public def run(logstash_queue) - @runner_consumers = consumer_threads.times.map { |i| create_consumer("#{client_id}-#{i}") } - @runner_threads = @runner_consumers.map { |consumer| thread_runner(logstash_queue, consumer) } + @runner_consumers = consumer_threads.times.map do |index| + create_consumer("#{client_id}-#{index}") + end + @runner_threads = @runner_consumers.map.with_index do |consumer, index| + thread_runner(logstash_queue, consumer, index) + end @runner_threads.each { |t| t.join } end # def run @@ -237,9 +241,11 @@ def kafka_consumers end private - def thread_runner(logstash_queue, consumer) + def thread_runner(logstash_queue, consumer, consumer_index) + consumer_identifier = "#{client_id}-#{consumer_index}" Thread.new do begin + logger.info("opening consumer #{consumer_identifier}") unless @topics_pattern.nil? nooplistener = org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener.new pattern = java.util.regex.Pattern.compile(@topics_pattern) @@ -271,8 +277,15 @@ def thread_runner(logstash_queue, consumer) end end rescue org.apache.kafka.common.errors.WakeupException => e - raise e if !stop? + unless stop? + logger.error("wakeup exception in consumer #{consumer_identifier}: #{e}") + raise e + end + rescue => e + logger.error("uncaught exception in consumer #{consumer_identifier}: #{e}") + raise e ensure + logger.info("closing consumer #{consumer_identifier}") consumer.close end end