diff --git a/src/kafka-net/BrokerRouter.cs b/src/kafka-net/BrokerRouter.cs
index 739af428..d759873a 100644
--- a/src/kafka-net/BrokerRouter.cs
+++ b/src/kafka-net/BrokerRouter.cs
@@ -33,7 +33,7 @@ public BrokerRouter(KafkaOptions kafkaOptions)
foreach (var endpoint in _kafkaOptions.KafkaServerEndpoints)
{
- var conn = _kafkaOptions.KafkaConnectionFactory.Create(endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout);
+ var conn = _kafkaOptions.KafkaConnectionFactory.Create(endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout, _kafkaOptions.StreamDecorator);
_defaultConnectionIndex.AddOrUpdate(endpoint, e => conn, (e, c) => conn);
}
@@ -213,7 +213,7 @@ private void UpdateInternalMetadataCache(MetadataResponse metadata)
}
else
{
- connection = _kafkaOptions.KafkaConnectionFactory.Create(broker.Endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log);
+ connection = _kafkaOptions.KafkaConnectionFactory.Create(broker.Endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout, _kafkaOptions.StreamDecorator);
UpsertConnectionToBrokerConnectionIndex(broker.Broker.BrokerId, connection);
}
}
diff --git a/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs b/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs
index 42f8339d..5dd8e898 100644
--- a/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs
+++ b/src/kafka-net/Default/DefaultKafkaConnectionFactory.cs
@@ -9,9 +9,9 @@ namespace KafkaNet
{
public class DefaultKafkaConnectionFactory : IKafkaConnectionFactory
{
- public IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null)
+ public IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null)
{
- return new KafkaConnection(new KafkaTcpSocket(log, endpoint, maximumReconnectionTimeout), responseTimeoutMs, log);
+ return new KafkaConnection(new KafkaTcpSocket(log, endpoint, maximumReconnectionTimeout, streamDecorator), responseTimeoutMs, log);
}
public KafkaEndpoint Resolve(Uri kafkaAddress, IKafkaLog log)
diff --git a/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs b/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs
index 0373e666..f563bec0 100644
--- a/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs
+++ b/src/kafka-net/Interfaces/IKafkaConnectionFactory.cs
@@ -12,8 +12,9 @@ public interface IKafkaConnectionFactory
/// The amount of time to wait for a message response to be received after sending a message to Kafka
/// Logging interface used to record any log messages created by the connection.
/// The maximum time to wait when backing off on reconnection attempts.
- /// IKafkaConnection initialized to connecto to the given endpoint.
- IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null);
+ /// The decorator used to adjust the internal Tcp stream
+ /// IKafkaConnection initialized to connect to the given endpoint.
+ IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null);
///
/// Resolves a generic Uri into a uniquely identifiable KafkaEndpoint.
diff --git a/src/kafka-net/Interfaces/IStreamDecorator.cs b/src/kafka-net/Interfaces/IStreamDecorator.cs
new file mode 100644
index 00000000..26f32459
--- /dev/null
+++ b/src/kafka-net/Interfaces/IStreamDecorator.cs
@@ -0,0 +1,11 @@
+using System;
+using System.IO;
+
+namespace KafkaNet
+{
+ public interface IStreamDecorator
+ {
+ Type WrapStreamType { get; }
+ Stream WrapStream(Stream stream);
+ }
+}
\ No newline at end of file
diff --git a/src/kafka-net/KafkaTcpSocket.cs b/src/kafka-net/KafkaTcpSocket.cs
index 0e4eb5b9..f0b0342c 100644
--- a/src/kafka-net/KafkaTcpSocket.cs
+++ b/src/kafka-net/KafkaTcpSocket.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Threading.Tasks;
@@ -41,6 +42,7 @@ public class KafkaTcpSocket : IKafkaTcpSocket
private readonly AsyncLock _clientLock = new AsyncLock();
private TcpClient _client;
private int _disposeCount;
+ private readonly IStreamDecorator _streamDecorator;
///
/// Construct socket and open connection to a specified server.
@@ -48,8 +50,10 @@ public class KafkaTcpSocket : IKafkaTcpSocket
/// Logging facility for verbose messaging of actions.
/// The IP endpoint to connect to.
/// The maximum time to wait when backing off on reconnection attempts.
- public KafkaTcpSocket(IKafkaLog log, KafkaEndpoint endpoint, TimeSpan? maximumReconnectionTimeout = null)
+ /// The stream decorator.
+ public KafkaTcpSocket(IKafkaLog log, KafkaEndpoint endpoint, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null)
{
+ _streamDecorator = streamDecorator;
_log = log;
_endpoint = endpoint;
_maximumReconnectionTimeout = maximumReconnectionTimeout ?? TimeSpan.FromMinutes(MaxReconnectionTimeoutMinutes);
@@ -167,7 +171,7 @@ private void DedicatedSocketTask()
}
}
- private void ProcessNetworkstreamTasks(NetworkStream netStream)
+ private void ProcessNetworkstreamTasks(Stream netStream)
{
Task writeTask = Task.FromResult(true);
Task readTask = Task.FromResult(true);
@@ -196,7 +200,7 @@ private void ProcessNetworkstreamTasks(NetworkStream netStream)
}
}
- private async Task ProcessReadTaskAsync(NetworkStream netStream, SocketPayloadReadTask readTask)
+ private async Task ProcessReadTaskAsync(Stream netStream, SocketPayloadReadTask readTask)
{
using (readTask)
{
@@ -266,7 +270,7 @@ private async Task ProcessReadTaskAsync(NetworkStream netStream, SocketPayloadRe
}
}
- private async Task ProcessSentTasksAsync(NetworkStream netStream, SocketPayloadSendTask sendTask)
+ private async Task ProcessSentTasksAsync(Stream netStream, SocketPayloadSendTask sendTask)
{
if (sendTask == null) return;
@@ -305,7 +309,7 @@ private async Task ProcessSentTasksAsync(NetworkStream netStream, SocketPayloadS
}
}
- private async Task GetStreamAsync()
+ private async Task GetStreamAsync()
{
//using a semaphore here to allow async waiting rather than blocking locks
using (await _clientLock.LockAsync(_disposeToken.Token).ConfigureAwait(false))
@@ -314,8 +318,12 @@ private async Task GetStreamAsync()
{
_client = await ReEstablishConnectionAsync().ConfigureAwait(false);
}
-
- return _client == null ? null : _client.GetStream();
+ var currentStream = _client == null ? null : _client.GetStream();
+ if (_streamDecorator != null)
+ {
+ return _streamDecorator.WrapStream(currentStream);
+ }
+ return currentStream;
}
}
diff --git a/src/kafka-net/Model/KafkaOptions.cs b/src/kafka-net/Model/KafkaOptions.cs
index 54dde3ad..7ec7ad40 100644
--- a/src/kafka-net/Model/KafkaOptions.cs
+++ b/src/kafka-net/Model/KafkaOptions.cs
@@ -65,5 +65,7 @@ public KafkaOptions(params Uri[] kafkaServerUri)
KafkaConnectionFactory = new DefaultKafkaConnectionFactory();
ResponseTimeoutMs = TimeSpan.FromMilliseconds(DefaultResponseTimeout);
}
+
+ public IStreamDecorator StreamDecorator { get; set; }
}
}
diff --git a/src/kafka-net/kafka-net.csproj b/src/kafka-net/kafka-net.csproj
index f4c10d57..91cab55d 100644
--- a/src/kafka-net/kafka-net.csproj
+++ b/src/kafka-net/kafka-net.csproj
@@ -49,6 +49,7 @@
+
diff --git a/src/kafka-tests/Fakes/BrokerRouterProxy.cs b/src/kafka-tests/Fakes/BrokerRouterProxy.cs
index b97d21ad..57368484 100644
--- a/src/kafka-tests/Fakes/BrokerRouterProxy.cs
+++ b/src/kafka-tests/Fakes/BrokerRouterProxy.cs
@@ -46,10 +46,10 @@ public BrokerRouterProxy(MoqMockingKernel kernel)
_fakeConn1.MetadataResponseFunction = () => MetadataResponse();
_fakeConn1.OffsetResponseFunction = () => new OffsetResponse { Offsets = new List { 0, 100 }, PartitionId = 1, Topic = TestTopic };
_fakeConn1.FetchResponseFunction = () => { Thread.Sleep(500); return null; };
-
+
_mockKafkaConnectionFactory = _kernel.GetMock();
- _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 1), It.IsAny(), It.IsAny(), null)).Returns(() => _fakeConn0);
- _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 2), It.IsAny(), It.IsAny(), null)).Returns(() => _fakeConn1);
+ _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 1), It.IsAny(), It.IsAny(), null, null)).Returns(() => _fakeConn0);
+ _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 2), It.IsAny(), It.IsAny(), null, null)).Returns(() => _fakeConn1);
_mockKafkaConnectionFactory.Setup(x => x.Resolve(It.IsAny(), It.IsAny()))
.Returns((uri, log) => new KafkaEndpoint
{
diff --git a/src/kafka-tests/Fakes/FakeStreamDecorator.cs b/src/kafka-tests/Fakes/FakeStreamDecorator.cs
new file mode 100644
index 00000000..a8547e07
--- /dev/null
+++ b/src/kafka-tests/Fakes/FakeStreamDecorator.cs
@@ -0,0 +1,24 @@
+using System;
+using System.IO;
+using KafkaNet;
+
+namespace kafka_tests.Fakes
+{
+ public class FakeStreamDecorator:IStreamDecorator
+ {
+ public int CalledCount { get; set; }
+
+ public void Reset()
+ {
+ CalledCount = 0;
+ }
+
+ public Type WrapStreamType { get; private set; }
+
+ public Stream WrapStream(Stream stream)
+ {
+ CalledCount++;
+ return stream;
+ }
+ }
+}
diff --git a/src/kafka-tests/Unit/AsyncLockTests.cs b/src/kafka-tests/Unit/AsyncLockTests.cs
index 6c6cce29..4afe83d7 100644
--- a/src/kafka-tests/Unit/AsyncLockTests.cs
+++ b/src/kafka-tests/Unit/AsyncLockTests.cs
@@ -148,7 +148,8 @@ public void AsyncLockShouldUnlockEvenFromDifferentThreads()
Assert.That(count, Is.EqualTo(1), "Only one task should have gotten past lock.");
block.Release();
- TaskTest.WaitFor(() => count > 1);
+ //NOTE: this test is related to a timing issue of the test itself. need to give more time on the WaitFor to ensure Increment can happen
+ TaskTest.WaitFor(() => count > 1, 6000);
Assert.That(count, Is.EqualTo(2), "Second call should get past lock.");
}
diff --git a/src/kafka-tests/Unit/BrokerRouterTests.cs b/src/kafka-tests/Unit/BrokerRouterTests.cs
index c97bb78d..af945b7a 100644
--- a/src/kafka-tests/Unit/BrokerRouterTests.cs
+++ b/src/kafka-tests/Unit/BrokerRouterTests.cs
@@ -32,7 +32,7 @@ public void Setup()
_mockPartitionSelector = _kernel.GetMock();
_mockKafkaConnection1 = _kernel.GetMock();
_mockKafkaConnectionFactory = _kernel.GetMock();
- _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 1), It.IsAny(), It.IsAny(), null)).Returns(() => _mockKafkaConnection1.Object);
+ _mockKafkaConnectionFactory.Setup(x => x.Create(It.Is(e => e.Endpoint.Port == 1), It.IsAny(), It.IsAny(), null, null)).Returns(() => _mockKafkaConnection1.Object);
_mockKafkaConnectionFactory.Setup(x => x.Resolve(It.IsAny(), It.IsAny()))
.Returns((uri, log) => new KafkaEndpoint
{
@@ -85,7 +85,7 @@ public void BrokerRouterUsesFactoryToAddNewBrokers()
.Returns(() => Task.Run(() => new List { CreateMetaResponse() }));
var topics = router.GetTopicMetadata(TestTopic);
- _mockKafkaConnectionFactory.Verify(x => x.Create(It.Is(e => e.Endpoint.Port == 2), It.IsAny(), It.IsAny(), null), Times.Once());
+ _mockKafkaConnectionFactory.Verify(x => x.Create(It.Is(e => e.Endpoint.Port == 2), It.IsAny(), It.IsAny(), null,null), Times.Once());
}
#region MetadataRequest Tests...
diff --git a/src/kafka-tests/Unit/KafkaTcpSocketTests.cs b/src/kafka-tests/Unit/KafkaTcpSocketTests.cs
index 45b9aa60..6faa179a 100644
--- a/src/kafka-tests/Unit/KafkaTcpSocketTests.cs
+++ b/src/kafka-tests/Unit/KafkaTcpSocketTests.cs
@@ -483,6 +483,33 @@ public void WriteAndReadShouldBeAsynchronous()
}
}
+ [Test]
+ public void WriteAndReadShouldUseStreamDecoratorIfProvided()
+ {
+ var write = new List();
+ var read = new List();
+ var expected = new List { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+ var decorator = new FakeStreamDecorator();
+ using (var server = new FakeTcpServer(FakeServerPort))
+ using (var test = new KafkaTcpSocket(new DefaultTraceLog(), _fakeServerUrl, streamDecorator: decorator))
+ {
+ server.OnBytesReceived += data => write.AddRange(data.Batch(4).Select(x => x.ToArray().ToInt32()));
+
+ var tasks = Enumerable.Range(1, 10)
+ .SelectMany(i => new[]
+ {
+ test.WriteAsync(i.ToBytes().ToPayload()),
+ test.ReadAsync(4).ContinueWith(t => read.Add(t.Result.ToInt32())),
+ server.SendDataAsync(i.ToBytes())
+ }).ToArray();
+
+ Task.WaitAll(tasks);
+ Assert.That(write.OrderBy(x => x), Is.EqualTo(expected));
+ Assert.That(read.OrderBy(x => x), Is.EqualTo(expected));
+ Assert.That(decorator.CalledCount, Is.EqualTo(1));
+ }
+ }
+
[Test]
public void WriteShouldHandleLargeVolumeSendAsynchronously()
{
diff --git a/src/kafka-tests/Unit/ProducerTests.cs b/src/kafka-tests/Unit/ProducerTests.cs
index 9cb1aabe..6131fea2 100644
--- a/src/kafka-tests/Unit/ProducerTests.cs
+++ b/src/kafka-tests/Unit/ProducerTests.cs
@@ -83,7 +83,7 @@ public void ProducerShouldReportCorrectAmountOfAsyncRequests()
Assert.That(producer.AsyncCount, Is.EqualTo(1), "One async operation should be sending.");
semaphore.Release();
- sendTask.Wait(TimeSpan.FromMilliseconds(500));
+ sendTask.Wait(TimeSpan.FromMilliseconds(1000));
Assert.That(sendTask.IsCompleted, Is.True, "Send task should be marked as completed.");
Assert.That(producer.AsyncCount, Is.EqualTo(0), "Async should now show zero count.");
diff --git a/src/kafka-tests/kafka-tests.csproj b/src/kafka-tests/kafka-tests.csproj
index 5665176b..d50ac86f 100644
--- a/src/kafka-tests/kafka-tests.csproj
+++ b/src/kafka-tests/kafka-tests.csproj
@@ -64,6 +64,7 @@
+