Skip to content

Commit 1680709

Browse files
committed
remarks
1 parent 8f0abf4 commit 1680709

File tree

6 files changed

+48
-27
lines changed

6 files changed

+48
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 2.8.13 (Unreleased)
44
- [Enhancement] Make `fenced` error skip-reload behavior configurable via new `non_reloadable_errors` setting (defaults to `[:fenced]` for backward compatibility).
55
- [Enhancement] Add `producer.reload` event allowing config modification before reload to escape fencing loops (#706).
6+
- [Enhancement] Do not early initialize the new instance on reload.
67

78
## 2.8.12 (2025-10-10)
89
- [Enhancement] Introduce `reload_on_idempotent_fatal_error` to automatically reload librdkafka producer after fatal errors on idempotent (non-transactional) producers.

lib/waterdrop/producer.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,5 +559,15 @@ def produce(message)
559559
ensure
560560
@operations_in_progress.decrement
561561
end
562+
563+
# Reloads the client
564+
# @note This should be used only within proper mutexes internally
565+
def reload!
566+
@client.flush(current_variant.max_wait_timeout)
567+
purge
568+
@client.close
569+
@client = nil
570+
@status.configured!
571+
end
562572
end
563573
end

lib/waterdrop/producer/idempotence.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,7 @@ def idempotent_reload_client_on_fatal_error(attempt, error)
7777
producer_id: id,
7878
attempt: attempt
7979
) do
80-
@client.flush(current_variant.max_wait_timeout)
81-
purge
82-
@client.close
83-
@client = Builder.new.call(self, @config)
80+
reload!
8481
end
8582
end
8683
end

lib/waterdrop/producer/transactions.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def transaction
6363
@transaction_mutex.synchronize do
6464
ensure_active!
6565

66+
@reloaded = false
67+
6668
transactional_instrument(:finished) do
6769
with_transactional_error_handling(:begin) do
6870
transactional_instrument(:started) { client.begin_transaction }
@@ -285,6 +287,11 @@ def transactional_reload_client_if_needed(error)
285287

286288
# Check if we've exceeded max reload attempts
287289
return unless transactional_retryable?
290+
# We bubble up transactional errors, so there are cases where when fencing is not
291+
# considered a non-reloadable, two layers of error handling would attempt to reload the
292+
# client causing double reload. This halts reload if we're in a configured state as it
293+
# means, we've already reloaded and we are not even yet connected
294+
return if @status.configured?
288295

289296
# Increment attempts before reload
290297
@transaction_fatal_error_attempts += 1
@@ -304,16 +311,14 @@ def transactional_reload_client_if_needed(error)
304311
# Clear cached state that depends on config
305312
# We always clear @transactional as it might have been modified via the event
306313
@transactional = nil
314+
@reloaded = true
307315

308316
@monitor.instrument(
309317
'producer.reloaded',
310318
producer_id: id,
311319
attempt: @transaction_fatal_error_attempts
312320
) do
313-
@client.flush(current_variant.max_wait_timeout)
314-
purge
315-
@client.close
316-
@client = Builder.new.call(self, @config)
321+
reload!
317322
end
318323
end
319324

spec/integrations/fatal_error_recovery/fencing_escape_with_reload_event_spec.rb

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,20 @@
3939

4040
# Subscribe to producer.reload event and modify transactional.id to escape fencing
4141
producer1.monitor.subscribe('producer.reload') do |event|
42-
# When fenced, rotate to a new transactional.id
43-
if event[:error].code == :fenced
44-
new_id = "#{TRANSACTIONAL_ID}-recovered-#{Time.now.to_i}"
45-
event[:caller].config.kafka[:'transactional.id'] = new_id
46-
puts "Fencing detected! Rotating transactional.id to: #{new_id}"
47-
end
42+
config = event[:caller].config
43+
config.kafka[:'transactional.id'] = "#{TRANSACTIONAL_ID}-recovered-#{Time.now.to_i}"
4844
end
4945

5046
producer1.monitor.subscribe('producer.reloaded') { |event| reload_events << event }
5147
producer1.monitor.subscribe('error.occurred') { |event| error_events << event }
5248

49+
topic_name = "it-fence-escape-#{SecureRandom.hex(6)}"
50+
51+
# First transaction with producer1
52+
producer1.transaction do
53+
producer1.produce_sync(topic: topic_name, payload: 'message1')
54+
end
55+
5356
# Create second producer with same ID to cause fencing
5457
producer2 = WaterDrop::Producer.new do |config|
5558
config.kafka = {
@@ -62,13 +65,6 @@
6265
config.logger = Logger.new($stdout, level: Logger::INFO)
6366
end
6467

65-
topic_name = "it-fence-escape-#{SecureRandom.hex(6)}"
66-
67-
# First transaction with producer1
68-
producer1.transaction do
69-
producer1.produce_sync(topic: topic_name, payload: 'message1')
70-
end
71-
7268
# This transaction will fence producer1
7369
producer2.transaction do
7470
producer2.produce_sync(topic: topic_name, payload: 'message2')
@@ -80,8 +76,19 @@
8076
producer1.produce_sync(topic: topic_name, payload: 'message3-recovered')
8177
end
8278
rescue Rdkafka::RdkafkaError => e
83-
puts "Failed after reload: #{e.message}"
84-
exit(1)
79+
# This is expected. User needs to retry transaction if wants
80+
# Reloading does not mean, that fencing is not re-raised in the transactional mode
81+
exit(1) unless e.code == :fenced
82+
end
83+
84+
10.times do
85+
producer1.transaction do
86+
producer1.produce_sync(topic: topic_name, payload: 'message3-recovered')
87+
end
88+
89+
producer2.transaction do
90+
producer2.produce_sync(topic: topic_name, payload: 'message2')
91+
end
8592
end
8693

8794
producer1.close
@@ -91,7 +98,4 @@
9198
# Should have exactly 1 reload (not multiple like in the loop case)
9299
success = reload_events.size == 1 && reload_events.first[:attempt] == 1
93100

94-
puts "Reload events: #{reload_events.size}"
95-
puts "Success: #{success}"
96-
97101
exit(success ? 0 : 1)

spec/integrations/fatal_error_recovery/fencing_with_reload_loop_spec.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,14 @@ def configure_producer
6666
producer2.produce_sync(topic: topic_name, payload: 'message2')
6767
end
6868

69-
# This will trigger reload loop: fenced -> reload -> fenced -> reload...
7069
producer1.transaction do
7170
producer1.produce_sync(topic: topic_name, payload: 'message3')
7271
end
72+
73+
# This will trigger reload loop: fenced -> reload -> fenced -> reload...
74+
producer2.transaction do
75+
producer2.produce_sync(topic: topic_name, payload: 'message2')
76+
end
7377
rescue Rdkafka::RdkafkaError => e
7478
exit(1) unless e.code == :fenced
7579

0 commit comments

Comments
 (0)