Skip to content

Commit 503939b

Browse files
authored
Allow producer config alterations on reload (#710)
* remarks * do not include lock for integrations * allow reload alterations * bump version * remarks * remove reloaded
1 parent 103d6d1 commit 503939b

File tree

11 files changed

+239
-96
lines changed

11 files changed

+239
-96
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# WaterDrop changelog
22

3-
## Unreleased
3+
## 2.8.13 (Unreleased)
44
- [Enhancement] Make `fenced` error skip-reload behavior configurable via new `non_reloadable_errors` setting (defaults to `[:fenced]` for backward compatibility).
5+
- [Enhancement] Add `producer.reload` event allowing config modification before reload to escape fencing loops (#706).
6+
- [Enhancement] Do not early initialize the new instance on reload.
57

68
## 2.8.12 (2025-10-10)
79
- [Enhancement] Introduce `reload_on_idempotent_fatal_error` to automatically reload librdkafka producer after fatal errors on idempotent (non-transactional) producers.

Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
waterdrop (2.8.12)
4+
waterdrop (2.8.13)
55
karafka-core (>= 2.4.9, < 3.0.0)
66
karafka-rdkafka (>= 0.20.0)
77
zeitwerk (~> 2.3)

lib/waterdrop/instrumentation/notifications.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class Notifications < ::Karafka::Core::Monitoring::Notifications
1010
producer.connected
1111
producer.closing
1212
producer.closed
13+
producer.reload
1314
producer.reloaded
1415
producer.disconnecting
1516
producer.disconnected

lib/waterdrop/producer.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ def produce(message)
502502
)
503503

504504
# Attempt to reload the producer
505-
idempotent_reload_client_on_fatal_error(@idempotent_fatal_error_attempts)
505+
idempotent_reload_client_on_fatal_error(@idempotent_fatal_error_attempts, e)
506506

507507
# Wait before retrying to avoid rapid reload loops
508508
sleep(@config.wait_backoff_on_idempotent_fatal_error / 1_000.0)
@@ -559,5 +559,15 @@ def produce(message)
559559
ensure
560560
@operations_in_progress.decrement
561561
end
562+
563+
# Reloads the client
564+
# @note This should be used only within proper mutexes internally
565+
def reload!
566+
@client.flush(current_variant.max_wait_timeout)
567+
purge
568+
@client.close
569+
@client = nil
570+
@status.configured!
571+
end
562572
end
563573
end

lib/waterdrop/producer/idempotence.rb

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,21 +50,34 @@ def idempotent_retryable?
5050
# old client, and create a new client instance to continue operations.
5151
#
5252
# @param attempt [Integer] the current reload attempt number
53+
# @param error [Rdkafka::RdkafkaError] the error that triggered the reload
5354
#
5455
# @note This is only called for idempotent, non-transactional producers when
5556
# `reload_on_idempotent_fatal_error` is enabled
5657
# @note After reload, the producer will automatically retry the failed operation
57-
def idempotent_reload_client_on_fatal_error(attempt)
58+
def idempotent_reload_client_on_fatal_error(attempt, error)
5859
@operating_mutex.synchronize do
60+
# Emit producer.reload event before reload
61+
# Users can subscribe to this event and modify event[:caller].config.kafka to change
62+
# producer config
63+
@monitor.instrument(
64+
'producer.reload',
65+
producer_id: id,
66+
error: error,
67+
attempt: attempt,
68+
caller: self
69+
)
70+
71+
# Clear cached state that depends on config
72+
# We always clear @idempotent as it might have been modified via the event
73+
@idempotent = nil
74+
5975
@monitor.instrument(
6076
'producer.reloaded',
6177
producer_id: id,
6278
attempt: attempt
6379
) do
64-
@client.flush(current_variant.max_wait_timeout)
65-
purge
66-
@client.close
67-
@client = Builder.new.call(self, @config)
80+
reload!
6881
end
6982
end
7083
end

lib/waterdrop/producer/transactions.rb

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,20 +285,37 @@ def transactional_reload_client_if_needed(error)
285285

286286
# Check if we've exceeded max reload attempts
287287
return unless transactional_retryable?
288+
# We bubble up transactional errors, so there are cases where when fencing is not
289+
# considered a non-reloadable, two layers of error handling would attempt to reload the
290+
# client causing double reload. This halts reload if we're in a configured state as it
291+
# means, we've already reloaded and we are not even yet connected
292+
return if @status.configured?
288293

289294
# Increment attempts before reload
290295
@transaction_fatal_error_attempts += 1
291296

292297
@operating_mutex.synchronize do
298+
# Emit producer.reload event before reload
299+
# Users can subscribe to this event and modify event[:caller].config.kafka to change
300+
# producer config. This is useful for escaping fencing loops by changing transactional.id
301+
@monitor.instrument(
302+
'producer.reload',
303+
producer_id: id,
304+
error: rd_error,
305+
attempt: @transaction_fatal_error_attempts,
306+
caller: self
307+
)
308+
309+
# Clear cached state that depends on config
310+
# We always clear @transactional as it might have been modified via the event
311+
@transactional = nil
312+
293313
@monitor.instrument(
294314
'producer.reloaded',
295315
producer_id: id,
296316
attempt: @transaction_fatal_error_attempts
297317
) do
298-
@client.flush(current_variant.max_wait_timeout)
299-
purge
300-
@client.close
301-
@client = Builder.new.call(self, @config)
318+
reload!
302319
end
303320
end
304321

lib/waterdrop/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,5 @@
33
# WaterDrop library
44
module WaterDrop
55
# Current WaterDrop version
6-
VERSION = '2.8.12'
6+
VERSION = '2.8.13'
77
end

spec/integrations/fatal_error_recovery/README.md

Lines changed: 0 additions & 81 deletions
This file was deleted.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# frozen_string_literal: true
2+
3+
# Integration test demonstrating how to escape producer fencing by using the producer.reload
4+
# event to modify transactional.id. This test shows the recommended pattern for handling fencing
5+
# in production environments.
6+
#
7+
# When a producer gets fenced, simply reloading with the same transactional.id creates an
8+
# infinite loop. But by subscribing to the producer.reload event and modifying the
9+
# transactional.id, the producer can escape fencing and continue operating with a new identity.
10+
11+
require 'waterdrop'
12+
require 'logger'
13+
require 'securerandom'
14+
15+
BOOTSTRAP_SERVERS = ENV.fetch('BOOTSTRAP_SERVERS', '127.0.0.1:9092')
16+
# Same ID for both producers initially
17+
TRANSACTIONAL_ID = "fence-escape-test-#{SecureRandom.uuid}".freeze
18+
19+
# Track instrumentation events
20+
reload_events = []
21+
error_events = []
22+
23+
# Create first producer with reload enabled
24+
producer1 = WaterDrop::Producer.new do |config|
25+
config.kafka = {
26+
'bootstrap.servers': BOOTSTRAP_SERVERS,
27+
'transactional.id': TRANSACTIONAL_ID,
28+
'transaction.timeout.ms': 30_000,
29+
'message.timeout.ms': 30_000
30+
}
31+
config.max_wait_timeout = 5_000
32+
config.logger = Logger.new($stdout, level: Logger::INFO)
33+
config.reload_on_transaction_fatal_error = true
34+
# IMPORTANT: Remove :fenced from non_reloadable_errors to allow reload attempts
35+
config.non_reloadable_errors = []
36+
config.max_attempts_on_transaction_fatal_error = 5
37+
config.wait_backoff_on_transaction_fatal_error = 100
38+
end
39+
40+
# Subscribe to producer.reload event and modify transactional.id to escape fencing
41+
producer1.monitor.subscribe('producer.reload') do |event|
42+
config = event[:caller].config
43+
config.kafka[:'transactional.id'] = "#{TRANSACTIONAL_ID}-recovered-#{Time.now.to_i}"
44+
end
45+
46+
producer1.monitor.subscribe('producer.reloaded') { |event| reload_events << event }
47+
producer1.monitor.subscribe('error.occurred') { |event| error_events << event }
48+
49+
topic_name = "it-fence-escape-#{SecureRandom.hex(6)}"
50+
51+
# First transaction with producer1
52+
producer1.transaction do
53+
producer1.produce_sync(topic: topic_name, payload: 'message1')
54+
end
55+
56+
# Create second producer with same ID to cause fencing
57+
producer2 = WaterDrop::Producer.new do |config|
58+
config.kafka = {
59+
'bootstrap.servers': BOOTSTRAP_SERVERS,
60+
'transactional.id': TRANSACTIONAL_ID,
61+
'transaction.timeout.ms': 30_000,
62+
'message.timeout.ms': 30_000
63+
}
64+
config.max_wait_timeout = 5_000
65+
config.logger = Logger.new($stdout, level: Logger::INFO)
66+
end
67+
68+
# This transaction will fence producer1
69+
producer2.transaction do
70+
producer2.produce_sync(topic: topic_name, payload: 'message2')
71+
end
72+
73+
# This should trigger reload with transactional.id change and succeed
74+
begin
75+
producer1.transaction do
76+
producer1.produce_sync(topic: topic_name, payload: 'message3-recovered')
77+
end
78+
rescue Rdkafka::RdkafkaError => e
79+
# This is expected. User needs to retry transaction if wants
80+
# Reloading does not mean, that fencing is not re-raised in the transactional mode
81+
exit(1) unless e.code == :fenced
82+
end
83+
84+
10.times do
85+
producer1.transaction do
86+
producer1.produce_sync(topic: topic_name, payload: 'message3-recovered')
87+
end
88+
89+
producer2.transaction do
90+
producer2.produce_sync(topic: topic_name, payload: 'message2')
91+
end
92+
end
93+
94+
producer1.close
95+
producer2.close
96+
97+
# Verify results
98+
# Should have exactly 1 reload (not multiple like in the loop case)
99+
success = reload_events.size == 1 && reload_events.first[:attempt] == 1
100+
101+
exit(success ? 0 : 1)

spec/integrations/fatal_error_recovery/fencing_with_reload_loop_spec.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,14 @@ def configure_producer
6666
producer2.produce_sync(topic: topic_name, payload: 'message2')
6767
end
6868

69-
# This will trigger reload loop: fenced -> reload -> fenced -> reload...
7069
producer1.transaction do
7170
producer1.produce_sync(topic: topic_name, payload: 'message3')
7271
end
72+
73+
# This will trigger reload loop: fenced -> reload -> fenced -> reload...
74+
producer2.transaction do
75+
producer2.produce_sync(topic: topic_name, payload: 'message2')
76+
end
7377
rescue Rdkafka::RdkafkaError => e
7478
exit(1) unless e.code == :fenced
7579

0 commit comments

Comments
 (0)