diff --git a/src/kafka-net/Common/Extensions.cs b/src/kafka-net/Common/Extensions.cs index 096275c2..1b2545ad 100644 --- a/src/kafka-net/Common/Extensions.cs +++ b/src/kafka-net/Common/Extensions.cs @@ -234,5 +234,95 @@ public static Exception ExtractException(this Task task) return new ApplicationException("Unknown exception occured."); } + + /// + /// https://blogs.msdn.microsoft.com/pfxteam/2011/11/10/crafting-a-task-timeoutafter-method/ + /// + /// + /// + /// + /// exception to be set on timeout + /// result to be set on timeout (in case exceptionOnTimeout is null) + /// + public static Task TimeoutAfter( + this TaskCompletionSource tcs, + int millisecondsTimeout, + Exception exceptionOnTimeout = null, + T resultOnTimeout = default(T) + ) + { + if (tcs == null) throw new ArgumentNullException("tcs"); + // Short-circuit #1: infinite timeout or task already completed + if (tcs.Task.IsCompleted || (millisecondsTimeout == Timeout.Infinite)) + { + // Either the task has already completed or timeout will never occur. + // No proxy necessary. + return tcs.Task; + } + + // Short-circuit #2: zero timeout + if (millisecondsTimeout == 0) + { + // We've already timed out. + if (exceptionOnTimeout != null) + tcs.TrySetException(exceptionOnTimeout); + else + tcs.TrySetResult(resultOnTimeout); + return tcs.Task; + } + + // Set up a timer to complete after the specified timeout period + Timer timer = new Timer(state => + { + // Recover your state information + var myTcs = (TaskCompletionSource)state; + + // Fault our proxy with a TimeoutException + if (exceptionOnTimeout != null) + myTcs.TrySetException(exceptionOnTimeout); + else + myTcs.TrySetResult(resultOnTimeout); + }, tcs, millisecondsTimeout, Timeout.Infinite); + + // Wire up the logic for what happens when source task completes + tcs.Task.ContinueWith((antecedent, state) => + { + // Recover our state data + var tuple = + (Tuple>)state; + + // Cancel the Timer + tuple.Item1.Dispose(); + + // Marshal results to proxy + MarshalTaskResults(antecedent, tuple.Item2); + }, + Tuple.Create(timer, tcs), + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + + return tcs.Task; + } + + internal static void MarshalTaskResults( + Task source, TaskCompletionSource proxy) + { + switch (source.Status) + { + case TaskStatus.Faulted: + proxy.TrySetException(source.Exception); + break; + case TaskStatus.Canceled: + proxy.TrySetCanceled(); + break; + case TaskStatus.RanToCompletion: + Task castedSource = source as Task; + proxy.TrySetResult( + castedSource == null ? default(TResult) : // source is a Task + castedSource.Result); // source is a Task + break; + } + } } } diff --git a/src/kafka-net/Producer.cs b/src/kafka-net/Producer.cs index a86dde3d..1ca94e6e 100644 --- a/src/kafka-net/Producer.cs +++ b/src/kafka-net/Producer.cs @@ -17,10 +17,12 @@ public class Producer : IMetadataQueries { private const int MaxDisposeWaitSeconds = 30; private const int DefaultAckTimeoutMS = 1000; + private const int DefaultMessageTimeoutMS = DefaultAckTimeoutMS + 1000; private const int MaximumAsyncRequests = 20; private const int MaximumMessageBuffer = 1000; private const int DefaultBatchDelayMS = 100; private const int DefaultBatchSize = 100; + public const short ErrorOnTimeout = -1; private readonly CancellationTokenSource _stopToken = new CancellationTokenSource(); private readonly int _maximumAsyncRequests; @@ -109,13 +111,14 @@ public Producer(IBrokerRouter brokerRouter, int maximumAsyncRequests = MaximumAs /// The codec to apply to the message collection. Defaults to none. /// List of ProduceResponses from each partition sent to or empty list if acks = 0. public async Task> SendMessageAsync(string topic, IEnumerable messages, Int16 acks = 1, - TimeSpan? timeout = null, MessageCodec codec = MessageCodec.CodecNone) + TimeSpan? timeout = null, MessageCodec codec = MessageCodec.CodecNone, TimeSpan? messageTimeout = null) { if (_stopToken.IsCancellationRequested) throw new ObjectDisposedException("Cannot send new documents as producer is disposing."); if (timeout == null) timeout = TimeSpan.FromMilliseconds(DefaultAckTimeoutMS); + if (messageTimeout == null) messageTimeout = TimeSpan.FromMilliseconds(DefaultMessageTimeoutMS); - var batch = messages.Select(message => new TopicMessage + var batch = messages.Select(message => new TopicMessage(messageTimeout.Value) { Acks = acks, Codec = codec, @@ -126,7 +129,7 @@ public async Task> SendMessageAsync(string topic, IEnumera _asyncCollection.AddRange(batch); - await Task.WhenAll(batch.Select(x => x.Tcs.Task)); + await Task.WhenAll(batch.Select(x => x.Tcs.Task)).ConfigureAwait(false); return batch.Select(topicMessage => topicMessage.Tcs.Task.Result) .Distinct() @@ -332,9 +335,17 @@ class TopicMessage public string Topic { get; set; } public Message Message { get; set; } - public TopicMessage() + public TopicMessage(TimeSpan messageTimeout) { Tcs = new TaskCompletionSource(); + Tcs.TimeoutAfter( + (int)messageTimeout.TotalMilliseconds, + null, + new ProduceResponse() + { + Topic = Topic, + Error = Producer.ErrorOnTimeout + }); } } diff --git a/src/kafka-tests/Integration/ProducerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerIntegrationTests.cs index 6cdcac3a..c57f85f2 100644 --- a/src/kafka-tests/Integration/ProducerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerIntegrationTests.cs @@ -73,5 +73,26 @@ public async void SendAsyncShouldGetOneResultForEachPartitionThroughBatching() Assert.That(result.Count, Is.EqualTo(2)); } } + + [TestCase(0)] + [TestCase(1)] + [MaxTime(5000)] + public async Task UnreachableHost_Timeout_Test(short acks) + { + using (var router = new BrokerRouter(new KafkaOptions(new Uri("http://localhost:1234")))) + using (var producer = new Producer(router)) + { + var result = await producer.SendMessageAsync( + IntegrationConfig.IntegrationTopic, + new[] { new Message("1") }, + acks, + messageTimeout: TimeSpan.FromMilliseconds(200) + ).ConfigureAwait(false); + + Assert.That(result.Count, Is.EqualTo(1)); + Assert.That(result[0].Error, Is.EqualTo(Producer.ErrorOnTimeout)); + } + } + } } diff --git a/src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs b/src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs index 65fc8fdc..2bd0166d 100644 --- a/src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs +++ b/src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs @@ -55,9 +55,8 @@ public void Int32Tests(Int32 expectedValue, Byte[] givenBytes) [TestCase((UInt32)0, new Byte[] { 0x00, 0x00, 0x00, 0x00 })] [TestCase((UInt32)1, new Byte[] { 0x00, 0x00, 0x00, 0x01 })] [TestCase((UInt32)123456789, new Byte[] { 0x07, 0x5B, 0xCD, 0x15 })] - [TestCase(UInt32.MinValue, new Byte[] { 0x00, 0x00, 0x00, 0x00 })] [TestCase(UInt32.MaxValue, new Byte[] { 0xFF, 0xFF, 0xFF, 0xFF })] - public void UInt32Tests(UInt32 expectedValue, Byte[] givenBytes) + public void UInt32FromBytesTests(UInt32 expectedValue, Byte[] givenBytes) { // arrange var binaryReader = new BigEndianBinaryReader(givenBytes); @@ -142,9 +141,9 @@ public void CharTests(Char expectedValue, Byte[] givenBytes) } [Theory] - [TestCase(new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })] - [TestCase(new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })] - public void CharArrayTests(Char[] expectedValue, Byte[] givenBytes) + [TestCase(1, new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })] + [TestCase(2, new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })] + public void CharArrayFromBytesTests(int testCase, Char[] expectedValue, Byte[] givenBytes) { // arrange var binaryReader = new BigEndianBinaryReader(givenBytes); diff --git a/src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs b/src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs index 7470cdb2..31d83878 100644 --- a/src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs +++ b/src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs @@ -79,9 +79,8 @@ public void Int16Tests(Int16 number, Byte[] expectedBytes) [TestCase((UInt32)0, new Byte[] { 0x00, 0x00, 0x00, 0x00 })] [TestCase((UInt32)1, new Byte[] { 0x00, 0x00, 0x00, 0x01 })] [TestCase((UInt32)123456789, new Byte[] { 0x07, 0x5B, 0xCD, 0x15 })] - [TestCase(UInt32.MinValue, new Byte[] { 0x00, 0x00, 0x00, 0x00 })] [TestCase(UInt32.MaxValue, new Byte[] { 0xFF, 0xFF, 0xFF, 0xFF })] - public void UInt32Tests(UInt32 number, Byte[] expectedBytes) + public void UInt32ToBytesTests(UInt32 number, Byte[] expectedBytes) { // arrange var memoryStream = new MemoryStream(); @@ -197,9 +196,9 @@ public void CharTests(Char value, Byte[] expectedBytes) } [Theory] - [TestCase(new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })] - [TestCase(new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })] - public void CharArrayTests(Char[] value, Byte[] expectedBytes) + [TestCase(1, new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })] + [TestCase(2, new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })] + public void CharArrayToBytesTests(int testCase, Char[] value, Byte[] expectedBytes) { // arrange var memoryStream = new MemoryStream(); @@ -214,9 +213,9 @@ public void CharArrayTests(Char[] value, Byte[] expectedBytes) } [Theory] - [TestCase(new Char[] { '0', '1', '2', '3' }, 1, 2, new Byte[] { 0x31, 0x32 })] - [TestCase(new Char[] { '€', '2', '€', '€' }, 1, 2, new Byte[] { 0x32, 0xE2, 0x82, 0xAC })] - public void CharSubArrayTests(Char[] value, Int32 index, Int32 count, Byte[] expectedBytes) + [TestCase(1, new Char[] { '0', '1', '2', '3' }, 1, 2, new Byte[] { 0x31, 0x32 })] + [TestCase(2, new Char[] { '€', '2', '€', '€' }, 1, 2, new Byte[] { 0x32, 0xE2, 0x82, 0xAC })] + public void CharSubArrayTests(int testCase, Char[] value, Int32 index, Int32 count, Byte[] expectedBytes) { // arrange var memoryStream = new MemoryStream();