From 21da89d2f847ff8325fe1a1817a5e9a4c4ffff34 Mon Sep 17 00:00:00 2001 From: Dan Turco Date: Mon, 8 Feb 2016 17:31:24 +0000 Subject: [PATCH 1/2] Added support for a stream decorator pattern in TcpSocket --- src/kafka-net/Interfaces/IStreamDecorator.cs | 11 ++++++++ src/kafka-net/KafkaTcpSocket.cs | 22 +++++++++++----- src/kafka-net/kafka-net.csproj | 1 + src/kafka-tests/Fakes/FakeStreamDecorator.cs | 24 +++++++++++++++++ src/kafka-tests/Unit/AsyncLockTests.cs | 3 ++- src/kafka-tests/Unit/KafkaTcpSocketTests.cs | 27 ++++++++++++++++++++ src/kafka-tests/Unit/ProducerTests.cs | 2 +- src/kafka-tests/kafka-tests.csproj | 1 + 8 files changed, 82 insertions(+), 9 deletions(-) create mode 100644 src/kafka-net/Interfaces/IStreamDecorator.cs create mode 100644 src/kafka-tests/Fakes/FakeStreamDecorator.cs diff --git a/src/kafka-net/Interfaces/IStreamDecorator.cs b/src/kafka-net/Interfaces/IStreamDecorator.cs new file mode 100644 index 00000000..26f32459 --- /dev/null +++ b/src/kafka-net/Interfaces/IStreamDecorator.cs @@ -0,0 +1,11 @@ +using System; +using System.IO; + +namespace KafkaNet +{ + public interface IStreamDecorator + { + Type WrapStreamType { get; } + Stream WrapStream(Stream stream); + } +} \ No newline at end of file diff --git a/src/kafka-net/KafkaTcpSocket.cs b/src/kafka-net/KafkaTcpSocket.cs index 0e4eb5b9..f0b0342c 100644 --- a/src/kafka-net/KafkaTcpSocket.cs +++ b/src/kafka-net/KafkaTcpSocket.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.IO; using System.Linq; using System.Net.Sockets; using System.Threading.Tasks; @@ -41,6 +42,7 @@ public class KafkaTcpSocket : IKafkaTcpSocket private readonly AsyncLock _clientLock = new AsyncLock(); private TcpClient _client; private int _disposeCount; + private readonly IStreamDecorator _streamDecorator; /// /// Construct socket and open connection to a specified server. @@ -48,8 +50,10 @@ public class KafkaTcpSocket : IKafkaTcpSocket /// Logging facility for verbose messaging of actions. /// The IP endpoint to connect to. /// The maximum time to wait when backing off on reconnection attempts. - public KafkaTcpSocket(IKafkaLog log, KafkaEndpoint endpoint, TimeSpan? maximumReconnectionTimeout = null) + /// The stream decorator. + public KafkaTcpSocket(IKafkaLog log, KafkaEndpoint endpoint, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null) { + _streamDecorator = streamDecorator; _log = log; _endpoint = endpoint; _maximumReconnectionTimeout = maximumReconnectionTimeout ?? TimeSpan.FromMinutes(MaxReconnectionTimeoutMinutes); @@ -167,7 +171,7 @@ private void DedicatedSocketTask() } } - private void ProcessNetworkstreamTasks(NetworkStream netStream) + private void ProcessNetworkstreamTasks(Stream netStream) { Task writeTask = Task.FromResult(true); Task readTask = Task.FromResult(true); @@ -196,7 +200,7 @@ private void ProcessNetworkstreamTasks(NetworkStream netStream) } } - private async Task ProcessReadTaskAsync(NetworkStream netStream, SocketPayloadReadTask readTask) + private async Task ProcessReadTaskAsync(Stream netStream, SocketPayloadReadTask readTask) { using (readTask) { @@ -266,7 +270,7 @@ private async Task ProcessReadTaskAsync(NetworkStream netStream, SocketPayloadRe } } - private async Task ProcessSentTasksAsync(NetworkStream netStream, SocketPayloadSendTask sendTask) + private async Task ProcessSentTasksAsync(Stream netStream, SocketPayloadSendTask sendTask) { if (sendTask == null) return; @@ -305,7 +309,7 @@ private async Task ProcessSentTasksAsync(NetworkStream netStream, SocketPayloadS } } - private async Task GetStreamAsync() + private async Task GetStreamAsync() { //using a semaphore here to allow async waiting rather than blocking locks using (await _clientLock.LockAsync(_disposeToken.Token).ConfigureAwait(false)) @@ -314,8 +318,12 @@ private async Task GetStreamAsync() { _client = await ReEstablishConnectionAsync().ConfigureAwait(false); } - - return _client == null ? null : _client.GetStream(); + var currentStream = _client == null ? null : _client.GetStream(); + if (_streamDecorator != null) + { + return _streamDecorator.WrapStream(currentStream); + } + return currentStream; } } diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj index f4c10d57..91cab55d 100644 --- a/src/kafka-net/kafka-net.csproj +++ b/src/kafka-net/kafka-net.csproj @@ -49,6 +49,7 @@ + diff --git a/src/kafka-tests/Fakes/FakeStreamDecorator.cs b/src/kafka-tests/Fakes/FakeStreamDecorator.cs new file mode 100644 index 00000000..a8547e07 --- /dev/null +++ b/src/kafka-tests/Fakes/FakeStreamDecorator.cs @@ -0,0 +1,24 @@ +using System; +using System.IO; +using KafkaNet; + +namespace kafka_tests.Fakes +{ + public class FakeStreamDecorator:IStreamDecorator + { + public int CalledCount { get; set; } + + public void Reset() + { + CalledCount = 0; + } + + public Type WrapStreamType { get; private set; } + + public Stream WrapStream(Stream stream) + { + CalledCount++; + return stream; + } + } +} diff --git a/src/kafka-tests/Unit/AsyncLockTests.cs b/src/kafka-tests/Unit/AsyncLockTests.cs index 6c6cce29..4afe83d7 100644 --- a/src/kafka-tests/Unit/AsyncLockTests.cs +++ b/src/kafka-tests/Unit/AsyncLockTests.cs @@ -148,7 +148,8 @@ public void AsyncLockShouldUnlockEvenFromDifferentThreads() Assert.That(count, Is.EqualTo(1), "Only one task should have gotten past lock."); block.Release(); - TaskTest.WaitFor(() => count > 1); + //NOTE: this test is related to a timing issue of the test itself. need to give more time on the WaitFor to ensure Increment can happen + TaskTest.WaitFor(() => count > 1, 6000); Assert.That(count, Is.EqualTo(2), "Second call should get past lock."); } diff --git a/src/kafka-tests/Unit/KafkaTcpSocketTests.cs b/src/kafka-tests/Unit/KafkaTcpSocketTests.cs index 45b9aa60..6faa179a 100644 --- a/src/kafka-tests/Unit/KafkaTcpSocketTests.cs +++ b/src/kafka-tests/Unit/KafkaTcpSocketTests.cs @@ -483,6 +483,33 @@ public void WriteAndReadShouldBeAsynchronous() } } + [Test] + public void WriteAndReadShouldUseStreamDecoratorIfProvided() + { + var write = new List(); + var read = new List(); + var expected = new List { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + var decorator = new FakeStreamDecorator(); + using (var server = new FakeTcpServer(FakeServerPort)) + using (var test = new KafkaTcpSocket(new DefaultTraceLog(), _fakeServerUrl, streamDecorator: decorator)) + { + server.OnBytesReceived += data => write.AddRange(data.Batch(4).Select(x => x.ToArray().ToInt32())); + + var tasks = Enumerable.Range(1, 10) + .SelectMany(i => new[] + { + test.WriteAsync(i.ToBytes().ToPayload()), + test.ReadAsync(4).ContinueWith(t => read.Add(t.Result.ToInt32())), + server.SendDataAsync(i.ToBytes()) + }).ToArray(); + + Task.WaitAll(tasks); + Assert.That(write.OrderBy(x => x), Is.EqualTo(expected)); + Assert.That(read.OrderBy(x => x), Is.EqualTo(expected)); + Assert.That(decorator.CalledCount, Is.EqualTo(1)); + } + } + [Test] public void WriteShouldHandleLargeVolumeSendAsynchronously() { diff --git a/src/kafka-tests/Unit/ProducerTests.cs b/src/kafka-tests/Unit/ProducerTests.cs index 9cb1aabe..6131fea2 100644 --- a/src/kafka-tests/Unit/ProducerTests.cs +++ b/src/kafka-tests/Unit/ProducerTests.cs @@ -83,7 +83,7 @@ public void ProducerShouldReportCorrectAmountOfAsyncRequests() Assert.That(producer.AsyncCount, Is.EqualTo(1), "One async operation should be sending."); semaphore.Release(); - sendTask.Wait(TimeSpan.FromMilliseconds(500)); + sendTask.Wait(TimeSpan.FromMilliseconds(1000)); Assert.That(sendTask.IsCompleted, Is.True, "Send task should be marked as completed."); Assert.That(producer.AsyncCount, Is.EqualTo(0), "Async should now show zero count."); diff --git a/src/kafka-tests/kafka-tests.csproj b/src/kafka-tests/kafka-tests.csproj index 5665176b..d50ac86f 100644 --- a/src/kafka-tests/kafka-tests.csproj +++ b/src/kafka-tests/kafka-tests.csproj @@ -64,6 +64,7 @@ + From fa4d3e2957d13e60a44f22767b11cd974e9b8cab Mon Sep 17 00:00:00 2001 From: Dan Turco Date: Fri, 12 Feb 2016 17:11:21 +0000 Subject: [PATCH 2/2] Update connection and broker options to support Decorator --- src/kafka-net/BrokerRouter.cs | 4 ++-- src/kafka-net/Default/DefaultKafkaConnectionFactory.cs | 4 ++-- src/kafka-net/Interfaces/IKafkaConnectionFactory.cs | 5 +++-- src/kafka-net/Model/KafkaOptions.cs | 2 ++ src/kafka-tests/Fakes/BrokerRouterProxy.cs | 6 +++--- src/kafka-tests/Unit/BrokerRouterTests.cs | 4 ++-- 6 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/kafka-net/BrokerRouter.cs b/src/kafka-net/BrokerRouter.cs index 739af428..d759873a 100644 --- a/src/kafka-net/BrokerRouter.cs +++ b/src/kafka-net/BrokerRouter.cs @@ -33,7 +33,7 @@ public BrokerRouter(KafkaOptions kafkaOptions) foreach (var endpoint in _kafkaOptions.KafkaServerEndpoints) { - var conn = _kafkaOptions.KafkaConnectionFactory.Create(endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout); + var conn = _kafkaOptions.KafkaConnectionFactory.Create(endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout, _kafkaOptions.StreamDecorator); _defaultConnectionIndex.AddOrUpdate(endpoint, e => conn, (e, c) => conn); } @@ -213,7 +213,7 @@ private void UpdateInternalMetadataCache(MetadataResponse metadata) } else { - connection = _kafkaOptions.KafkaConnectionFactory.Create(broker.Endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log); + connection = _kafkaOptions.KafkaConnectionFactory.Create(broker.Endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout, _kafkaOptions.StreamDecorator); UpsertConnectionToBrokerConnectionIndex(broker.Broker.BrokerId, connection); } } diff --git a/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs b/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs index 42f8339d..5dd8e898 100644 --- a/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs +++ b/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs @@ -9,9 +9,9 @@ namespace KafkaNet { public class DefaultKafkaConnectionFactory : IKafkaConnectionFactory { - public IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null) + public IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null) { - return new KafkaConnection(new KafkaTcpSocket(log, endpoint, maximumReconnectionTimeout), responseTimeoutMs, log); + return new KafkaConnection(new KafkaTcpSocket(log, endpoint, maximumReconnectionTimeout, streamDecorator), responseTimeoutMs, log); } public KafkaEndpoint Resolve(Uri kafkaAddress, IKafkaLog log) diff --git a/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs b/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs index 0373e666..f563bec0 100644 --- a/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs +++ b/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs @@ -12,8 +12,9 @@ public interface IKafkaConnectionFactory /// The amount of time to wait for a message response to be received after sending a message to Kafka /// Logging interface used to record any log messages created by the connection. /// The maximum time to wait when backing off on reconnection attempts. - /// IKafkaConnection initialized to connecto to the given endpoint. - IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null); + /// The decorator used to adjust the internal Tcp stream + /// IKafkaConnection initialized to connect to the given endpoint. + IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null); /// /// Resolves a generic Uri into a uniquely identifiable KafkaEndpoint. diff --git a/src/kafka-net/Model/KafkaOptions.cs b/src/kafka-net/Model/KafkaOptions.cs index 54dde3ad..7ec7ad40 100644 --- a/src/kafka-net/Model/KafkaOptions.cs +++ b/src/kafka-net/Model/KafkaOptions.cs @@ -65,5 +65,7 @@ public KafkaOptions(params Uri[] kafkaServerUri) KafkaConnectionFactory = new DefaultKafkaConnectionFactory(); ResponseTimeoutMs = TimeSpan.FromMilliseconds(DefaultResponseTimeout); } + + public IStreamDecorator StreamDecorator { get; set; } } } diff --git a/src/kafka-tests/Fakes/BrokerRouterProxy.cs b/src/kafka-tests/Fakes/BrokerRouterProxy.cs index b97d21ad..57368484 100644 --- a/src/kafka-tests/Fakes/BrokerRouterProxy.cs +++ b/src/kafka-tests/Fakes/BrokerRouterProxy.cs @@ -46,10 +46,10 @@ public BrokerRouterProxy(MoqMockingKernel kernel) _fakeConn1.MetadataResponseFunction = () => MetadataResponse(); _fakeConn1.OffsetResponseFunction = () => new OffsetResponse { Offsets = new List { 0, 100 }, PartitionId = 1, Topic = TestTopic }; _fakeConn1.FetchResponseFunction = () => { Thread.Sleep(500); return null; }; - + _mockKafkaConnectionFactory = _kernel.GetMock(); - _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 1), It.IsAny(), It.IsAny(), null)).Returns(() => _fakeConn0); - _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 2), It.IsAny(), It.IsAny(), null)).Returns(() => _fakeConn1); + _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 1), It.IsAny(), It.IsAny(), null, null)).Returns(() => _fakeConn0); + _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 2), It.IsAny(), It.IsAny(), null, null)).Returns(() => _fakeConn1); _mockKafkaConnectionFactory.Setup(x => x.Resolve(It.IsAny(), It.IsAny())) .Returns((uri, log) => new KafkaEndpoint { diff --git a/src/kafka-tests/Unit/BrokerRouterTests.cs b/src/kafka-tests/Unit/BrokerRouterTests.cs index c97bb78d..af945b7a 100644 --- a/src/kafka-tests/Unit/BrokerRouterTests.cs +++ b/src/kafka-tests/Unit/BrokerRouterTests.cs @@ -32,7 +32,7 @@ public void Setup() _mockPartitionSelector = _kernel.GetMock(); _mockKafkaConnection1 = _kernel.GetMock(); _mockKafkaConnectionFactory = _kernel.GetMock(); - _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 1), It.IsAny(), It.IsAny(), null)).Returns(() => _mockKafkaConnection1.Object); + _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 1), It.IsAny(), It.IsAny(), null, null)).Returns(() => _mockKafkaConnection1.Object); _mockKafkaConnectionFactory.Setup(x => x.Resolve(It.IsAny(), It.IsAny())) .Returns((uri, log) => new KafkaEndpoint { @@ -85,7 +85,7 @@ public void BrokerRouterUsesFactoryToAddNewBrokers() .Returns(() => Task.Run(() => new List { CreateMetaResponse() })); var topics = router.GetTopicMetadata(TestTopic); - _mockKafkaConnectionFactory.Verify(x => x.Create(It.Is(e => e.Endpoint.Port == 2), It.IsAny(), It.IsAny(), null), Times.Once()); + _mockKafkaConnectionFactory.Verify(x => x.Create(It.Is(e => e.Endpoint.Port == 2), It.IsAny(), It.IsAny(), null,null), Times.Once()); } #region MetadataRequest Tests...