From 4f52356c77ad2117e47d35d73c21571778a1d1c4 Mon Sep 17 00:00:00 2001 From: Olexandr Tarasenkov Date: Mon, 19 Jun 2017 15:05:26 +0300 Subject: [PATCH 1/5] Applied timeout for Producer.SendMessageAsync for case when connection is unavailable In that case ProduceResponse Result will have Error = Producer.ErrorOnTimeout (-1) Previously it caused infinite waiting --- src/kafka-net/Common/Extensions.cs | 90 ++++++++++++++++++++++++++++++ src/kafka-net/Producer.cs | 17 +++++- 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/src/kafka-net/Common/Extensions.cs b/src/kafka-net/Common/Extensions.cs index 096275c2..4c1b152f 100644 --- a/src/kafka-net/Common/Extensions.cs +++ b/src/kafka-net/Common/Extensions.cs @@ -234,5 +234,95 @@ public static Exception ExtractException(this Task task) return new ApplicationException("Unknown exception occured."); } + + /// + /// https://blogs.msdn.microsoft.com/pfxteam/2011/11/10/crafting-a-task-timeoutafter-method/ + /// + /// + /// + /// + /// exception to be set on timeout + /// result to be set on timeout (in case exceptionOnTimeout is null) + /// + public static Task TimeoutAfter( + this TaskCompletionSource tcs, + int millisecondsTimeout, + Exception exceptionOnTimeout = null, + T resultOnTimeout = default(T) + ) + { + if (tcs == null) throw new ArgumentNullException(nameof(tcs)); + // Short-circuit #1: infinite timeout or task already completed + if (tcs.Task.IsCompleted || (millisecondsTimeout == Timeout.Infinite)) + { + // Either the task has already completed or timeout will never occur. + // No proxy necessary. + return tcs.Task; + } + + // Short-circuit #2: zero timeout + if (millisecondsTimeout == 0) + { + // We've already timed out. + if (exceptionOnTimeout != null) + tcs.SetException(exceptionOnTimeout); + else + tcs.SetResult(resultOnTimeout); + return tcs.Task; + } + + // Set up a timer to complete after the specified timeout period + Timer timer = new Timer(state => + { + // Recover your state information + var myTcs = (TaskCompletionSource)state; + + // Fault our proxy with a TimeoutException + if (exceptionOnTimeout != null) + myTcs.TrySetException(exceptionOnTimeout); + else + myTcs.TrySetResult(resultOnTimeout); + }, tcs, millisecondsTimeout, Timeout.Infinite); + + // Wire up the logic for what happens when source task completes + tcs.Task.ContinueWith((antecedent, state) => + { + // Recover our state data + var tuple = + (Tuple>)state; + + // Cancel the Timer + tuple.Item1.Dispose(); + + // Marshal results to proxy + MarshalTaskResults(antecedent, tuple.Item2); + }, + Tuple.Create(timer, tcs), + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + + return tcs.Task; + } + + internal static void MarshalTaskResults( + Task source, TaskCompletionSource proxy) + { + switch (source.Status) + { + case TaskStatus.Faulted: + proxy.TrySetException(source.Exception); + break; + case TaskStatus.Canceled: + proxy.TrySetCanceled(); + break; + case TaskStatus.RanToCompletion: + Task castedSource = source as Task; + proxy.TrySetResult( + castedSource == null ? default(TResult) : // source is a Task + castedSource.Result); // source is a Task + break; + } + } } } diff --git a/src/kafka-net/Producer.cs b/src/kafka-net/Producer.cs index a86dde3d..c7819b2e 100644 --- a/src/kafka-net/Producer.cs +++ b/src/kafka-net/Producer.cs @@ -21,6 +21,7 @@ public class Producer : IMetadataQueries private const int MaximumMessageBuffer = 1000; private const int DefaultBatchDelayMS = 100; private const int DefaultBatchSize = 100; + public const short ErrorOnTimeout = -1; private readonly CancellationTokenSource _stopToken = new CancellationTokenSource(); private readonly int _maximumAsyncRequests; @@ -126,9 +127,9 @@ public async Task> SendMessageAsync(string topic, IEnumera _asyncCollection.AddRange(batch); - await Task.WhenAll(batch.Select(x => x.Tcs.Task)); + await Task.WhenAll(batch.Select(x => x.Task.Value)); - return batch.Select(topicMessage => topicMessage.Tcs.Task.Result) + return batch.Select(topicMessage => topicMessage.Task.Value.Result) .Distinct() .ToList(); } @@ -332,9 +333,21 @@ class TopicMessage public string Topic { get; set; } public Message Message { get; set; } + public Lazy> Task { get; } + public TopicMessage() { Tcs = new TaskCompletionSource(); + Task = new Lazy>(() => { + return Tcs.TimeoutAfter( + (int)Timeout.TotalMilliseconds, + null, + new ProduceResponse() + { + Topic = Topic, + Error = Producer.ErrorOnTimeout + }); + }, LazyThreadSafetyMode.ExecutionAndPublication); } } From 30172459e5256028d419eb07a035d02491ee4f65 Mon Sep 17 00:00:00 2001 From: Olexandr Tarasenkov Date: Mon, 19 Jun 2017 15:47:58 +0300 Subject: [PATCH 2/5] Added UnreachableHost_Timeout_Test --- src/kafka-net/Common/Extensions.cs | 4 ++-- src/kafka-net/Producer.cs | 2 +- .../Integration/ProducerIntegrationTests.cs | 21 +++++++++++++++++++ .../Unit/BigEndianBinaryReaderTests.cs | 9 ++++---- .../Unit/BigEndianBinaryWriterTests.cs | 15 +++++++------ 5 files changed, 35 insertions(+), 16 deletions(-) diff --git a/src/kafka-net/Common/Extensions.cs b/src/kafka-net/Common/Extensions.cs index 4c1b152f..5fe49305 100644 --- a/src/kafka-net/Common/Extensions.cs +++ b/src/kafka-net/Common/Extensions.cs @@ -265,9 +265,9 @@ public static Task TimeoutAfter( { // We've already timed out. if (exceptionOnTimeout != null) - tcs.SetException(exceptionOnTimeout); + tcs.TrySetException(exceptionOnTimeout); else - tcs.SetResult(resultOnTimeout); + tcs.TrySetResult(resultOnTimeout); return tcs.Task; } diff --git a/src/kafka-net/Producer.cs b/src/kafka-net/Producer.cs index c7819b2e..6567ab82 100644 --- a/src/kafka-net/Producer.cs +++ b/src/kafka-net/Producer.cs @@ -127,7 +127,7 @@ public async Task> SendMessageAsync(string topic, IEnumera _asyncCollection.AddRange(batch); - await Task.WhenAll(batch.Select(x => x.Task.Value)); + await Task.WhenAll(batch.Select(x => x.Task.Value)).ConfigureAwait(false); return batch.Select(topicMessage => topicMessage.Task.Value.Result) .Distinct() diff --git a/src/kafka-tests/Integration/ProducerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerIntegrationTests.cs index 6cdcac3a..91c1b1c2 100644 --- a/src/kafka-tests/Integration/ProducerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerIntegrationTests.cs @@ -73,5 +73,26 @@ public async void SendAsyncShouldGetOneResultForEachPartitionThroughBatching() Assert.That(result.Count, Is.EqualTo(2)); } } + + [TestCase(0)] + [TestCase(1)] + [MaxTime(5000)] + public async Task UnreachableHost_Timeout_Test(short acks) + { + using (var router = new BrokerRouter(new KafkaOptions(new Uri("http://localhost:1234")))) + using (var producer = new Producer(router)) + { + var result = await producer.SendMessageAsync( + IntegrationConfig.IntegrationTopic, + new[] { new Message("1") }, + acks, + TimeSpan.FromMilliseconds(200) + ).ConfigureAwait(false); + + Assert.That(result.Count, Is.EqualTo(1)); + Assert.That(result[0].Error, Is.EqualTo(Producer.ErrorOnTimeout)); + } + } + } } diff --git a/src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs b/src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs index 65fc8fdc..2bd0166d 100644 --- a/src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs +++ b/src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs @@ -55,9 +55,8 @@ public void Int32Tests(Int32 expectedValue, Byte[] givenBytes) [TestCase((UInt32)0, new Byte[] { 0x00, 0x00, 0x00, 0x00 })] [TestCase((UInt32)1, new Byte[] { 0x00, 0x00, 0x00, 0x01 })] [TestCase((UInt32)123456789, new Byte[] { 0x07, 0x5B, 0xCD, 0x15 })] - [TestCase(UInt32.MinValue, new Byte[] { 0x00, 0x00, 0x00, 0x00 })] [TestCase(UInt32.MaxValue, new Byte[] { 0xFF, 0xFF, 0xFF, 0xFF })] - public void UInt32Tests(UInt32 expectedValue, Byte[] givenBytes) + public void UInt32FromBytesTests(UInt32 expectedValue, Byte[] givenBytes) { // arrange var binaryReader = new BigEndianBinaryReader(givenBytes); @@ -142,9 +141,9 @@ public void CharTests(Char expectedValue, Byte[] givenBytes) } [Theory] - [TestCase(new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })] - [TestCase(new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })] - public void CharArrayTests(Char[] expectedValue, Byte[] givenBytes) + [TestCase(1, new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })] + [TestCase(2, new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })] + public void CharArrayFromBytesTests(int testCase, Char[] expectedValue, Byte[] givenBytes) { // arrange var binaryReader = new BigEndianBinaryReader(givenBytes); diff --git a/src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs b/src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs index 7470cdb2..31d83878 100644 --- a/src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs +++ b/src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs @@ -79,9 +79,8 @@ public void Int16Tests(Int16 number, Byte[] expectedBytes) [TestCase((UInt32)0, new Byte[] { 0x00, 0x00, 0x00, 0x00 })] [TestCase((UInt32)1, new Byte[] { 0x00, 0x00, 0x00, 0x01 })] [TestCase((UInt32)123456789, new Byte[] { 0x07, 0x5B, 0xCD, 0x15 })] - [TestCase(UInt32.MinValue, new Byte[] { 0x00, 0x00, 0x00, 0x00 })] [TestCase(UInt32.MaxValue, new Byte[] { 0xFF, 0xFF, 0xFF, 0xFF })] - public void UInt32Tests(UInt32 number, Byte[] expectedBytes) + public void UInt32ToBytesTests(UInt32 number, Byte[] expectedBytes) { // arrange var memoryStream = new MemoryStream(); @@ -197,9 +196,9 @@ public void CharTests(Char value, Byte[] expectedBytes) } [Theory] - [TestCase(new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })] - [TestCase(new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })] - public void CharArrayTests(Char[] value, Byte[] expectedBytes) + [TestCase(1, new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })] + [TestCase(2, new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })] + public void CharArrayToBytesTests(int testCase, Char[] value, Byte[] expectedBytes) { // arrange var memoryStream = new MemoryStream(); @@ -214,9 +213,9 @@ public void CharArrayTests(Char[] value, Byte[] expectedBytes) } [Theory] - [TestCase(new Char[] { '0', '1', '2', '3' }, 1, 2, new Byte[] { 0x31, 0x32 })] - [TestCase(new Char[] { '€', '2', '€', '€' }, 1, 2, new Byte[] { 0x32, 0xE2, 0x82, 0xAC })] - public void CharSubArrayTests(Char[] value, Int32 index, Int32 count, Byte[] expectedBytes) + [TestCase(1, new Char[] { '0', '1', '2', '3' }, 1, 2, new Byte[] { 0x31, 0x32 })] + [TestCase(2, new Char[] { '€', '2', '€', '€' }, 1, 2, new Byte[] { 0x32, 0xE2, 0x82, 0xAC })] + public void CharSubArrayTests(int testCase, Char[] value, Int32 index, Int32 count, Byte[] expectedBytes) { // arrange var memoryStream = new MemoryStream(); From e2d30322273dd8c1fddbf5c32a5a7943624cfc06 Mon Sep 17 00:00:00 2001 From: Olexandr Tarasenkov Date: Mon, 19 Jun 2017 15:52:35 +0300 Subject: [PATCH 3/5] Fix for appveyor build --- src/kafka-net/Producer.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kafka-net/Producer.cs b/src/kafka-net/Producer.cs index 6567ab82..03a94482 100644 --- a/src/kafka-net/Producer.cs +++ b/src/kafka-net/Producer.cs @@ -333,7 +333,7 @@ class TopicMessage public string Topic { get; set; } public Message Message { get; set; } - public Lazy> Task { get; } + public Lazy> Task { get; private set; } public TopicMessage() { From 7593729e1d7e6511b4f0ca4c36d8e6209f1a5c95 Mon Sep 17 00:00:00 2001 From: Olexandr Tarasenkov Date: Mon, 19 Jun 2017 15:54:54 +0300 Subject: [PATCH 4/5] Fix for appveyor build #2 --- src/kafka-net/Common/Extensions.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kafka-net/Common/Extensions.cs b/src/kafka-net/Common/Extensions.cs index 5fe49305..1b2545ad 100644 --- a/src/kafka-net/Common/Extensions.cs +++ b/src/kafka-net/Common/Extensions.cs @@ -251,7 +251,7 @@ public static Task TimeoutAfter( T resultOnTimeout = default(T) ) { - if (tcs == null) throw new ArgumentNullException(nameof(tcs)); + if (tcs == null) throw new ArgumentNullException("tcs"); // Short-circuit #1: infinite timeout or task already completed if (tcs.Task.IsCompleted || (millisecondsTimeout == Timeout.Infinite)) { From 0903b1b1c400b835bdc0f916868f90936edffdae Mon Sep 17 00:00:00 2001 From: Olexandr Tarasenkov Date: Mon, 19 Jun 2017 16:13:28 +0300 Subject: [PATCH 5/5] Fix for test ProducesShouldSendExpectedProduceRequestForEachAckLevelAndTimeoutCombination to pass --- src/kafka-net/Producer.cs | 32 +++++++++---------- .../Integration/ProducerIntegrationTests.cs | 2 +- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/src/kafka-net/Producer.cs b/src/kafka-net/Producer.cs index 03a94482..1ca94e6e 100644 --- a/src/kafka-net/Producer.cs +++ b/src/kafka-net/Producer.cs @@ -17,6 +17,7 @@ public class Producer : IMetadataQueries { private const int MaxDisposeWaitSeconds = 30; private const int DefaultAckTimeoutMS = 1000; + private const int DefaultMessageTimeoutMS = DefaultAckTimeoutMS + 1000; private const int MaximumAsyncRequests = 20; private const int MaximumMessageBuffer = 1000; private const int DefaultBatchDelayMS = 100; @@ -110,13 +111,14 @@ public Producer(IBrokerRouter brokerRouter, int maximumAsyncRequests = MaximumAs /// The codec to apply to the message collection. Defaults to none. /// List of ProduceResponses from each partition sent to or empty list if acks = 0. public async Task> SendMessageAsync(string topic, IEnumerable messages, Int16 acks = 1, - TimeSpan? timeout = null, MessageCodec codec = MessageCodec.CodecNone) + TimeSpan? timeout = null, MessageCodec codec = MessageCodec.CodecNone, TimeSpan? messageTimeout = null) { if (_stopToken.IsCancellationRequested) throw new ObjectDisposedException("Cannot send new documents as producer is disposing."); if (timeout == null) timeout = TimeSpan.FromMilliseconds(DefaultAckTimeoutMS); + if (messageTimeout == null) messageTimeout = TimeSpan.FromMilliseconds(DefaultMessageTimeoutMS); - var batch = messages.Select(message => new TopicMessage + var batch = messages.Select(message => new TopicMessage(messageTimeout.Value) { Acks = acks, Codec = codec, @@ -127,9 +129,9 @@ public async Task> SendMessageAsync(string topic, IEnumera _asyncCollection.AddRange(batch); - await Task.WhenAll(batch.Select(x => x.Task.Value)).ConfigureAwait(false); + await Task.WhenAll(batch.Select(x => x.Tcs.Task)).ConfigureAwait(false); - return batch.Select(topicMessage => topicMessage.Task.Value.Result) + return batch.Select(topicMessage => topicMessage.Tcs.Task.Result) .Distinct() .ToList(); } @@ -333,21 +335,17 @@ class TopicMessage public string Topic { get; set; } public Message Message { get; set; } - public Lazy> Task { get; private set; } - - public TopicMessage() + public TopicMessage(TimeSpan messageTimeout) { Tcs = new TaskCompletionSource(); - Task = new Lazy>(() => { - return Tcs.TimeoutAfter( - (int)Timeout.TotalMilliseconds, - null, - new ProduceResponse() - { - Topic = Topic, - Error = Producer.ErrorOnTimeout - }); - }, LazyThreadSafetyMode.ExecutionAndPublication); + Tcs.TimeoutAfter( + (int)messageTimeout.TotalMilliseconds, + null, + new ProduceResponse() + { + Topic = Topic, + Error = Producer.ErrorOnTimeout + }); } } diff --git a/src/kafka-tests/Integration/ProducerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerIntegrationTests.cs index 91c1b1c2..c57f85f2 100644 --- a/src/kafka-tests/Integration/ProducerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerIntegrationTests.cs @@ -86,7 +86,7 @@ public async Task UnreachableHost_Timeout_Test(short acks) IntegrationConfig.IntegrationTopic, new[] { new Message("1") }, acks, - TimeSpan.FromMilliseconds(200) + messageTimeout: TimeSpan.FromMilliseconds(200) ).ConfigureAwait(false); Assert.That(result.Count, Is.EqualTo(1));