From 2c255c8b1aed8ef1c6d05db783fdb5d0aa7737e3 Mon Sep 17 00:00:00 2001 From: buybackoff Date: Sun, 3 May 2015 03:37:56 +0300 Subject: [PATCH 1/4] use await Task.Delay instead of Thread.Sleep inside an async Task --- src/kafka-net/Consumer.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 99b9afcb..8f3a9abd 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -173,8 +173,8 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) } } - //no message received from server wait a while before we try another long poll - Thread.Sleep(_options.BackoffInterval); + //no message received from server wait a while before we try another long poll + await Task.Delay(_options.BackoffInterval); } catch (BufferUnderRunException ex) { From 08f04e6684ff90ea7d3dd5fb8cbca07c5a77f731 Mon Sep 17 00:00:00 2001 From: buybackoff Date: Sun, 3 May 2015 05:08:20 +0300 Subject: [PATCH 2/4] use AsyncCollection in Consumer --- src/kafka-net/Common/AsyncCollection.cs | 28 ++++++--- src/kafka-net/Consumer.cs | 79 +++++++++++++++++++++---- 2 files changed, 86 insertions(+), 21 deletions(-) diff --git a/src/kafka-net/Common/AsyncCollection.cs b/src/kafka-net/Common/AsyncCollection.cs index 771e4ab5..75675ece 100644 --- a/src/kafka-net/Common/AsyncCollection.cs +++ b/src/kafka-net/Common/AsyncCollection.cs @@ -10,12 +10,22 @@ public class AsyncCollection { private readonly object _lock = new object(); private readonly AsyncManualResetEvent _dataAvailableEvent = new AsyncManualResetEvent(); - private readonly ConcurrentBag _bag = new ConcurrentBag(); + private readonly IProducerConsumerCollection _collection; private long _dataInBufferCount = 0; - public int Count + public AsyncCollection() + { + _collection = new ConcurrentBag(); + } + + public AsyncCollection(IProducerConsumerCollection collection) + { + _collection = collection; + } + + public int Count { - get { return _bag.Count + (int)Interlocked.Read(ref _dataInBufferCount); } + get { return _collection.Count + (int)Interlocked.Read(ref _dataInBufferCount); } } public bool IsCompleted { get; private set; } @@ -37,7 +47,7 @@ public void Add(T data) throw new ObjectDisposedException("AsyncCollection has been marked as complete. No new documents can be added."); } - _bag.Add(data); + _collection.TryAdd(data); TriggerDataAvailability(); } @@ -88,7 +98,7 @@ public async Task> TakeAsync(int count, TimeSpan timeout, CancellationTo public void DrainAndApply(Action appliedFunc) { T data; - while (_bag.TryTake(out data)) + while (_collection.TryTake(out data)) { appliedFunc(data); } @@ -99,7 +109,7 @@ public void DrainAndApply(Action appliedFunc) public IEnumerable Drain() { T data; - while (_bag.TryTake(out data)) + while (_collection.TryTake(out data)) { yield return data; } @@ -111,11 +121,11 @@ public bool TryTake(out T data) { try { - return _bag.TryTake(out data); + return _collection.TryTake(out data); } finally { - if (_bag.IsEmpty) TriggerDataAvailability(); + if (_collection.Count == 0) TriggerDataAvailability(); } } @@ -123,7 +133,7 @@ private void TriggerDataAvailability() { lock (_lock) { - if (_bag.IsEmpty) + if (_collection.Count == 0) { _dataAvailableEvent.Reset(); } diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 8f3a9abd..7c8a9e71 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -1,9 +1,11 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.ComponentModel; using System.Linq; using System.Threading; using System.Threading.Tasks; +using KafkaNet.Common; using KafkaNet.Model; using KafkaNet.Protocol; @@ -18,7 +20,8 @@ namespace KafkaNet public class Consumer : IMetadataQueries, IDisposable { private readonly ConsumerOptions _options; - private readonly BlockingCollection _fetchResponseQueue; + private readonly AsyncCollection _fetchResponseCollection; + private readonly SemaphoreSlim _boundedCapacitySemaphore; private readonly CancellationTokenSource _disposeToken = new CancellationTokenSource(); private readonly ConcurrentDictionary _partitionPollingIndex = new ConcurrentDictionary(); private readonly ConcurrentDictionary _partitionOffsetIndex = new ConcurrentDictionary(); @@ -31,9 +34,10 @@ public class Consumer : IMetadataQueries, IDisposable public Consumer(ConsumerOptions options, params OffsetPosition[] positions) { _options = options; - _fetchResponseQueue = new BlockingCollection(_options.ConsumerBufferSize); + _fetchResponseCollection = new AsyncCollection(new ConcurrentQueue()); + _boundedCapacitySemaphore = new SemaphoreSlim(_options.ConsumerBufferSize, _options.ConsumerBufferSize); _metadataQueries = new MetadataQueries(_options.Router); - + SetOffsetPosition(positions); } @@ -50,7 +54,57 @@ public IEnumerable Consume(CancellationToken? cancellationToken = null) { _options.Log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic); EnsurePartitionPollingThreads(); - return _fetchResponseQueue.GetConsumingEnumerable(cancellationToken ?? CancellationToken.None); + while (true) + { + try + { + // blocking wait for data + _fetchResponseCollection.OnHasDataAvailable(cancellationToken ?? CancellationToken.None).Wait(); + } + catch (AggregateException e) + { + if (e.InnerException is TaskCanceledException) + { + throw new OperationCanceledException(); + } + throw; + } + Message data; + // do not call OnHasDataAvailable if data is available + while (_fetchResponseCollection.TryTake(out data)) + { + _boundedCapacitySemaphore.Release(); + yield return data; + } + } + } + + /// + /// Pull next message for consumption asynchronously + /// + public async Task ConsumeNextAsync(TimeSpan timeout, CancellationToken token) + { + // this will only iterlocked.increment/decrement(_ensureOneThread) and compare to 1 on each call + EnsurePartitionPollingThreads(); + + if (_fetchResponseCollection.IsCompleted) + { + var tcs = new TaskCompletionSource(); + tcs.SetCanceled(); + return await tcs.Task; + } + try + { + var result = (await _fetchResponseCollection.TakeAsync(1, timeout, token)).Single(); + _boundedCapacitySemaphore.Release(); + return result; + } + catch (Exception e) + { + var tcs = new TaskCompletionSource(); + tcs.SetException(e); + return await tcs.Task; + } } /// @@ -139,11 +193,11 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) var fetches = new List { fetch }; var fetchRequest = new FetchRequest - { - MaxWaitTime = (int)Math.Min((long)int.MaxValue, _options.MaxWaitTimeForMinimumBytes.TotalMilliseconds), - MinBytes = _options.MinimumBytes, - Fetches = fetches - }; + { + MaxWaitTime = (int)Math.Min((long)int.MaxValue, _options.MaxWaitTimeForMinimumBytes.TotalMilliseconds), + MinBytes = _options.MinimumBytes, + Fetches = fetches + }; //make request and post to queue var route = _options.Router.SelectBrokerRoute(topic, partitionId); @@ -160,7 +214,8 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) foreach (var message in response.Messages) { - _fetchResponseQueue.Add(message, _disposeToken.Token); + await _boundedCapacitySemaphore.WaitAsync(_disposeToken.Token); + _fetchResponseCollection.Add(message); if (_disposeToken.IsCancellationRequested) return; } @@ -173,8 +228,8 @@ private Task ConsumeTopicPartitionAsync(string topic, int partitionId) } } - //no message received from server wait a while before we try another long poll - await Task.Delay(_options.BackoffInterval); + //no message received from server wait a while before we try another long poll + await Task.Delay(_options.BackoffInterval); } catch (BufferUnderRunException ex) { From 7c44e7ec30436b3bd91c2c5549cedf09537a47da Mon Sep 17 00:00:00 2001 From: buybackoff Date: Sun, 3 May 2015 05:48:17 +0300 Subject: [PATCH 3/4] add a simple test for ConsumeNextAsync --- src/kafka-net/Consumer.cs | 14 ++++++- .../ProducerConsumerIntegrationTests.cs | 38 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 7c8a9e71..400c595b 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -54,6 +54,12 @@ public IEnumerable Consume(CancellationToken? cancellationToken = null) { _options.Log.DebugFormat("Consumer: Beginning consumption of topic: {0}", _options.Topic); EnsurePartitionPollingThreads(); + // need a separate method with yield, otherwise EnsurePartitionPollingThreads() is not called until enumeration, e.g. ToArray() + return DoConsume(cancellationToken); + } + + private IEnumerable DoConsume(CancellationToken? cancellationToken = null) + { while (true) { try @@ -79,8 +85,14 @@ public IEnumerable Consume(CancellationToken? cancellationToken = null) } } + + public async Task ConsumeNextAsync() + { + return await ConsumeNextAsync(new TimeSpan(int.MaxValue), CancellationToken.None); + } + /// - /// Pull next message for consumption asynchronously + /// Pull a next message for consumption asynchronously /// public async Task ConsumeNextAsync(TimeSpan timeout, CancellationToken token) { diff --git a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs index 9a88d9d0..cd525a8b 100644 --- a/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs +++ b/src/kafka-tests/Integration/ProducerConsumerIntegrationTests.cs @@ -74,6 +74,44 @@ public void ConsumerShouldConsumeInSameOrderAsProduced() } } + + [Test] + public void AsyncConsumerShouldConsumeInSameOrderAsProduced() + { + var expected = new List { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19" }; + var testId = Guid.NewGuid().ToString(); + + using (var router = new BrokerRouter(new KafkaOptions(IntegrationConfig.IntegrationUri))) + using (var producer = new Producer(router)) + { + + var offsets = producer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result; + + using (var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), + offsets.Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray())) + { + + for (int i = 0; i < 20; i++) + { + producer.SendMessageAsync(IntegrationConfig.IntegrationTopic, new[] { new Message(i.ToString(), testId) }).Wait(); + } + List results = new List(); + + for (int i = 0; i < 20; i++) + { + results.Add(consumer.ConsumeNextAsync().Result); + } + + //ensure the produced messages arrived + Console.WriteLine("Message order: {0}", string.Join(", ", results.Select(x => x.Value.ToUtf8String()).ToList())); + + Assert.That(results.Count, Is.EqualTo(20)); + Assert.That(results.Select(x => x.Value.ToUtf8String()).ToList(), Is.EqualTo(expected), "Expected the message list in the correct order."); + Assert.That(results.Any(x => x.Key.ToUtf8String() != testId), Is.False); + } + } + } + [Test] public void ConsumerShouldBeAbleToSeekBackToEarlierOffset() { From 52770ebd34fe55cf8f1d24adb8cca8a3a2c59262 Mon Sep 17 00:00:00 2001 From: buybackoff Date: Sun, 3 May 2015 06:08:42 +0300 Subject: [PATCH 4/4] remove await in catch block --- src/kafka-net/Consumer.cs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/kafka-net/Consumer.cs b/src/kafka-net/Consumer.cs index 400c595b..ab1abdad 100644 --- a/src/kafka-net/Consumer.cs +++ b/src/kafka-net/Consumer.cs @@ -105,18 +105,9 @@ public async Task ConsumeNextAsync(TimeSpan timeout, CancellationToken tcs.SetCanceled(); return await tcs.Task; } - try - { - var result = (await _fetchResponseCollection.TakeAsync(1, timeout, token)).Single(); - _boundedCapacitySemaphore.Release(); - return result; - } - catch (Exception e) - { - var tcs = new TaskCompletionSource(); - tcs.SetException(e); - return await tcs.Task; - } + var result = (await _fetchResponseCollection.TakeAsync(1, timeout, token)).Single(); + _boundedCapacitySemaphore.Release(); + return result; } ///