Skip to content

Applied timeout for Producer.SendMessageAsync for case when connection is unavailable #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/kafka-net/Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,5 +234,95 @@ public static Exception ExtractException(this Task task)

return new ApplicationException("Unknown exception occured.");
}

/// <summary>
/// https://blogs.msdn.microsoft.com/pfxteam/2011/11/10/crafting-a-task-timeoutafter-method/
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="tcs"></param>
/// <param name="millisecondsTimeout"></param>
/// <param name="exceptionOnTimeout">exception to be set on timeout</param>
/// <param name="resultOnTimeout">result to be set on timeout (in case exceptionOnTimeout is null)</param>
/// <returns></returns>
public static Task<T> TimeoutAfter<T>(
this TaskCompletionSource<T> tcs,
int millisecondsTimeout,
Exception exceptionOnTimeout = null,
T resultOnTimeout = default(T)
)
{
if (tcs == null) throw new ArgumentNullException("tcs");
// Short-circuit #1: infinite timeout or task already completed
if (tcs.Task.IsCompleted || (millisecondsTimeout == Timeout.Infinite))
{
// Either the task has already completed or timeout will never occur.
// No proxy necessary.
return tcs.Task;
}

// Short-circuit #2: zero timeout
if (millisecondsTimeout == 0)
{
// We've already timed out.
if (exceptionOnTimeout != null)
tcs.TrySetException(exceptionOnTimeout);
else
tcs.TrySetResult(resultOnTimeout);
return tcs.Task;
}

// Set up a timer to complete after the specified timeout period
Timer timer = new Timer(state =>
{
// Recover your state information
var myTcs = (TaskCompletionSource<T>)state;

// Fault our proxy with a TimeoutException
if (exceptionOnTimeout != null)
myTcs.TrySetException(exceptionOnTimeout);
else
myTcs.TrySetResult(resultOnTimeout);
}, tcs, millisecondsTimeout, Timeout.Infinite);

// Wire up the logic for what happens when source task completes
tcs.Task.ContinueWith((antecedent, state) =>
{
// Recover our state data
var tuple =
(Tuple<Timer, TaskCompletionSource<T>>)state;

// Cancel the Timer
tuple.Item1.Dispose();

// Marshal results to proxy
MarshalTaskResults(antecedent, tuple.Item2);
},
Tuple.Create(timer, tcs),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);

return tcs.Task;
}

internal static void MarshalTaskResults<TResult>(
Task source, TaskCompletionSource<TResult> proxy)
{
switch (source.Status)
{
case TaskStatus.Faulted:
proxy.TrySetException(source.Exception);
break;
case TaskStatus.Canceled:
proxy.TrySetCanceled();
break;
case TaskStatus.RanToCompletion:
Task<TResult> castedSource = source as Task<TResult>;
proxy.TrySetResult(
castedSource == null ? default(TResult) : // source is a Task
castedSource.Result); // source is a Task<TResult>
break;
}
}
}
}
19 changes: 15 additions & 4 deletions src/kafka-net/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ public class Producer : IMetadataQueries
{
private const int MaxDisposeWaitSeconds = 30;
private const int DefaultAckTimeoutMS = 1000;
private const int DefaultMessageTimeoutMS = DefaultAckTimeoutMS + 1000;
private const int MaximumAsyncRequests = 20;
private const int MaximumMessageBuffer = 1000;
private const int DefaultBatchDelayMS = 100;
private const int DefaultBatchSize = 100;
public const short ErrorOnTimeout = -1;

private readonly CancellationTokenSource _stopToken = new CancellationTokenSource();
private readonly int _maximumAsyncRequests;
Expand Down Expand Up @@ -109,13 +111,14 @@ public Producer(IBrokerRouter brokerRouter, int maximumAsyncRequests = MaximumAs
/// <param name="codec">The codec to apply to the message collection. Defaults to none.</param>
/// <returns>List of ProduceResponses from each partition sent to or empty list if acks = 0.</returns>
public async Task<List<ProduceResponse>> SendMessageAsync(string topic, IEnumerable<Message> messages, Int16 acks = 1,
TimeSpan? timeout = null, MessageCodec codec = MessageCodec.CodecNone)
TimeSpan? timeout = null, MessageCodec codec = MessageCodec.CodecNone, TimeSpan? messageTimeout = null)
{
if (_stopToken.IsCancellationRequested)
throw new ObjectDisposedException("Cannot send new documents as producer is disposing.");
if (timeout == null) timeout = TimeSpan.FromMilliseconds(DefaultAckTimeoutMS);
if (messageTimeout == null) messageTimeout = TimeSpan.FromMilliseconds(DefaultMessageTimeoutMS);

var batch = messages.Select(message => new TopicMessage
var batch = messages.Select(message => new TopicMessage(messageTimeout.Value)
{
Acks = acks,
Codec = codec,
Expand All @@ -126,7 +129,7 @@ public async Task<List<ProduceResponse>> SendMessageAsync(string topic, IEnumera

_asyncCollection.AddRange(batch);

await Task.WhenAll(batch.Select(x => x.Tcs.Task));
await Task.WhenAll(batch.Select(x => x.Tcs.Task)).ConfigureAwait(false);

return batch.Select(topicMessage => topicMessage.Tcs.Task.Result)
.Distinct()
Expand Down Expand Up @@ -332,9 +335,17 @@ class TopicMessage
public string Topic { get; set; }
public Message Message { get; set; }

public TopicMessage()
public TopicMessage(TimeSpan messageTimeout)
{
Tcs = new TaskCompletionSource<ProduceResponse>();
Tcs.TimeoutAfter(
(int)messageTimeout.TotalMilliseconds,
null,
new ProduceResponse()
{
Topic = Topic,
Error = Producer.ErrorOnTimeout
});
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/kafka-tests/Integration/ProducerIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,26 @@ public async void SendAsyncShouldGetOneResultForEachPartitionThroughBatching()
Assert.That(result.Count, Is.EqualTo(2));
}
}

[TestCase(0)]
[TestCase(1)]
[MaxTime(5000)]
public async Task UnreachableHost_Timeout_Test(short acks)
{
using (var router = new BrokerRouter(new KafkaOptions(new Uri("http://localhost:1234"))))
using (var producer = new Producer(router))
{
var result = await producer.SendMessageAsync(
IntegrationConfig.IntegrationTopic,
new[] { new Message("1") },
acks,
messageTimeout: TimeSpan.FromMilliseconds(200)
).ConfigureAwait(false);

Assert.That(result.Count, Is.EqualTo(1));
Assert.That(result[0].Error, Is.EqualTo(Producer.ErrorOnTimeout));
}
}

}
}
9 changes: 4 additions & 5 deletions src/kafka-tests/Unit/BigEndianBinaryReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ public void Int32Tests(Int32 expectedValue, Byte[] givenBytes)
[TestCase((UInt32)0, new Byte[] { 0x00, 0x00, 0x00, 0x00 })]
[TestCase((UInt32)1, new Byte[] { 0x00, 0x00, 0x00, 0x01 })]
[TestCase((UInt32)123456789, new Byte[] { 0x07, 0x5B, 0xCD, 0x15 })]
[TestCase(UInt32.MinValue, new Byte[] { 0x00, 0x00, 0x00, 0x00 })]
[TestCase(UInt32.MaxValue, new Byte[] { 0xFF, 0xFF, 0xFF, 0xFF })]
public void UInt32Tests(UInt32 expectedValue, Byte[] givenBytes)
public void UInt32FromBytesTests(UInt32 expectedValue, Byte[] givenBytes)
{
// arrange
var binaryReader = new BigEndianBinaryReader(givenBytes);
Expand Down Expand Up @@ -142,9 +141,9 @@ public void CharTests(Char expectedValue, Byte[] givenBytes)
}

[Theory]
[TestCase(new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })]
[TestCase(new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })]
public void CharArrayTests(Char[] expectedValue, Byte[] givenBytes)
[TestCase(1, new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })]
[TestCase(2, new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })]
public void CharArrayFromBytesTests(int testCase, Char[] expectedValue, Byte[] givenBytes)
{
// arrange
var binaryReader = new BigEndianBinaryReader(givenBytes);
Expand Down
15 changes: 7 additions & 8 deletions src/kafka-tests/Unit/BigEndianBinaryWriterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ public void Int16Tests(Int16 number, Byte[] expectedBytes)
[TestCase((UInt32)0, new Byte[] { 0x00, 0x00, 0x00, 0x00 })]
[TestCase((UInt32)1, new Byte[] { 0x00, 0x00, 0x00, 0x01 })]
[TestCase((UInt32)123456789, new Byte[] { 0x07, 0x5B, 0xCD, 0x15 })]
[TestCase(UInt32.MinValue, new Byte[] { 0x00, 0x00, 0x00, 0x00 })]
[TestCase(UInt32.MaxValue, new Byte[] { 0xFF, 0xFF, 0xFF, 0xFF })]
public void UInt32Tests(UInt32 number, Byte[] expectedBytes)
public void UInt32ToBytesTests(UInt32 number, Byte[] expectedBytes)
{
// arrange
var memoryStream = new MemoryStream();
Expand Down Expand Up @@ -197,9 +196,9 @@ public void CharTests(Char value, Byte[] expectedBytes)
}

[Theory]
[TestCase(new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })]
[TestCase(new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })]
public void CharArrayTests(Char[] value, Byte[] expectedBytes)
[TestCase(1, new Char[] { '0', '0', '0', '0' }, new Byte[] { 0x30, 0x30, 0x30, 0x30 })]
[TestCase(2, new Char[] { '€', '€', '€', '€' }, new Byte[] { 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC, 0xE2, 0x82, 0xAC })]
public void CharArrayToBytesTests(int testCase, Char[] value, Byte[] expectedBytes)
{
// arrange
var memoryStream = new MemoryStream();
Expand All @@ -214,9 +213,9 @@ public void CharArrayTests(Char[] value, Byte[] expectedBytes)
}

[Theory]
[TestCase(new Char[] { '0', '1', '2', '3' }, 1, 2, new Byte[] { 0x31, 0x32 })]
[TestCase(new Char[] { '€', '2', '€', '€' }, 1, 2, new Byte[] { 0x32, 0xE2, 0x82, 0xAC })]
public void CharSubArrayTests(Char[] value, Int32 index, Int32 count, Byte[] expectedBytes)
[TestCase(1, new Char[] { '0', '1', '2', '3' }, 1, 2, new Byte[] { 0x31, 0x32 })]
[TestCase(2, new Char[] { '€', '2', '€', '€' }, 1, 2, new Byte[] { 0x32, 0xE2, 0x82, 0xAC })]
public void CharSubArrayTests(int testCase, Char[] value, Int32 index, Int32 count, Byte[] expectedBytes)
{
// arrange
var memoryStream = new MemoryStream();
Expand Down