Skip to content

Added support for a stream decorator pattern in TcpSocket #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/kafka-net/BrokerRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public BrokerRouter(KafkaOptions kafkaOptions)

foreach (var endpoint in _kafkaOptions.KafkaServerEndpoints)
{
var conn = _kafkaOptions.KafkaConnectionFactory.Create(endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout);
var conn = _kafkaOptions.KafkaConnectionFactory.Create(endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout, _kafkaOptions.StreamDecorator);
_defaultConnectionIndex.AddOrUpdate(endpoint, e => conn, (e, c) => conn);
}

Expand Down Expand Up @@ -213,7 +213,7 @@ private void UpdateInternalMetadataCache(MetadataResponse metadata)
}
else
{
connection = _kafkaOptions.KafkaConnectionFactory.Create(broker.Endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log);
connection = _kafkaOptions.KafkaConnectionFactory.Create(broker.Endpoint, _kafkaOptions.ResponseTimeoutMs, _kafkaOptions.Log, _kafkaOptions.MaximumReconnectionTimeout, _kafkaOptions.StreamDecorator);
UpsertConnectionToBrokerConnectionIndex(broker.Broker.BrokerId, connection);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/kafka-net/Default/DefaultKafkaConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace KafkaNet
{
public class DefaultKafkaConnectionFactory : IKafkaConnectionFactory
{
public IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null)
public IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null)
{
return new KafkaConnection(new KafkaTcpSocket(log, endpoint, maximumReconnectionTimeout), responseTimeoutMs, log);
return new KafkaConnection(new KafkaTcpSocket(log, endpoint, maximumReconnectionTimeout, streamDecorator), responseTimeoutMs, log);
}

public KafkaEndpoint Resolve(Uri kafkaAddress, IKafkaLog log)
Expand Down
5 changes: 3 additions & 2 deletions src/kafka-net/Interfaces/IKafkaConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ public interface IKafkaConnectionFactory
/// <param name="responseTimeoutMs">The amount of time to wait for a message response to be received after sending a message to Kafka</param>
/// <param name="log">Logging interface used to record any log messages created by the connection.</param>
/// <param name="maximumReconnectionTimeout">The maximum time to wait when backing off on reconnection attempts.</param>
/// <returns>IKafkaConnection initialized to connecto to the given endpoint.</returns>
IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null);
/// <param name="streamDecorator">The decorator used to adjust the internal Tcp stream</param>
/// <returns>IKafkaConnection initialized to connect to the given endpoint.</returns>
IKafkaConnection Create(KafkaEndpoint endpoint, TimeSpan responseTimeoutMs, IKafkaLog log, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null);

/// <summary>
/// Resolves a generic Uri into a uniquely identifiable KafkaEndpoint.
Expand Down
11 changes: 11 additions & 0 deletions src/kafka-net/Interfaces/IStreamDecorator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.IO;

namespace KafkaNet
{
public interface IStreamDecorator
{
Type WrapStreamType { get; }
Stream WrapStream(Stream stream);
}
}
22 changes: 15 additions & 7 deletions src/kafka-net/KafkaTcpSocket.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Threading.Tasks;
Expand Down Expand Up @@ -41,15 +42,18 @@ public class KafkaTcpSocket : IKafkaTcpSocket
private readonly AsyncLock _clientLock = new AsyncLock();
private TcpClient _client;
private int _disposeCount;
private readonly IStreamDecorator _streamDecorator;

/// <summary>
/// Construct socket and open connection to a specified server.
/// </summary>
/// <param name="log">Logging facility for verbose messaging of actions.</param>
/// <param name="endpoint">The IP endpoint to connect to.</param>
/// <param name="maximumReconnectionTimeout">The maximum time to wait when backing off on reconnection attempts.</param>
public KafkaTcpSocket(IKafkaLog log, KafkaEndpoint endpoint, TimeSpan? maximumReconnectionTimeout = null)
/// <param name="streamDecorator">The stream decorator.</param>
public KafkaTcpSocket(IKafkaLog log, KafkaEndpoint endpoint, TimeSpan? maximumReconnectionTimeout = null, IStreamDecorator streamDecorator = null)
{
_streamDecorator = streamDecorator;
_log = log;
_endpoint = endpoint;
_maximumReconnectionTimeout = maximumReconnectionTimeout ?? TimeSpan.FromMinutes(MaxReconnectionTimeoutMinutes);
Expand Down Expand Up @@ -167,7 +171,7 @@ private void DedicatedSocketTask()
}
}

private void ProcessNetworkstreamTasks(NetworkStream netStream)
private void ProcessNetworkstreamTasks(Stream netStream)
{
Task writeTask = Task.FromResult(true);
Task readTask = Task.FromResult(true);
Expand Down Expand Up @@ -196,7 +200,7 @@ private void ProcessNetworkstreamTasks(NetworkStream netStream)
}
}

private async Task ProcessReadTaskAsync(NetworkStream netStream, SocketPayloadReadTask readTask)
private async Task ProcessReadTaskAsync(Stream netStream, SocketPayloadReadTask readTask)
{
using (readTask)
{
Expand Down Expand Up @@ -266,7 +270,7 @@ private async Task ProcessReadTaskAsync(NetworkStream netStream, SocketPayloadRe
}
}

private async Task ProcessSentTasksAsync(NetworkStream netStream, SocketPayloadSendTask sendTask)
private async Task ProcessSentTasksAsync(Stream netStream, SocketPayloadSendTask sendTask)
{
if (sendTask == null) return;

Expand Down Expand Up @@ -305,7 +309,7 @@ private async Task ProcessSentTasksAsync(NetworkStream netStream, SocketPayloadS
}
}

private async Task<NetworkStream> GetStreamAsync()
private async Task<Stream> GetStreamAsync()
{
//using a semaphore here to allow async waiting rather than blocking locks
using (await _clientLock.LockAsync(_disposeToken.Token).ConfigureAwait(false))
Expand All @@ -314,8 +318,12 @@ private async Task<NetworkStream> GetStreamAsync()
{
_client = await ReEstablishConnectionAsync().ConfigureAwait(false);
}

return _client == null ? null : _client.GetStream();
var currentStream = _client == null ? null : _client.GetStream();
if (_streamDecorator != null)
{
return _streamDecorator.WrapStream(currentStream);
}
return currentStream;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/kafka-net/Model/KafkaOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,7 @@ public KafkaOptions(params Uri[] kafkaServerUri)
KafkaConnectionFactory = new DefaultKafkaConnectionFactory();
ResponseTimeoutMs = TimeSpan.FromMilliseconds(DefaultResponseTimeout);
}

public IStreamDecorator StreamDecorator { get; set; }
}
}
1 change: 1 addition & 0 deletions src/kafka-net/kafka-net.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<Compile Include="Common\AsyncCollection.cs" />
<Compile Include="Common\ConcurrentCircularBuffer.cs" />
<Compile Include="Common\KafkaMessagePacker.cs" />
<Compile Include="Interfaces\IStreamDecorator.cs" />
<Compile Include="Statistics\StatisticsTracker.cs" />
<Compile Include="Default\ConsoleLog.cs" />
<Compile Include="Default\DefaultKafkaConnectionFactory.cs" />
Expand Down
6 changes: 3 additions & 3 deletions src/kafka-tests/Fakes/BrokerRouterProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ public BrokerRouterProxy(MoqMockingKernel kernel)
_fakeConn1.MetadataResponseFunction = () => MetadataResponse();
_fakeConn1.OffsetResponseFunction = () => new OffsetResponse { Offsets = new List<long> { 0, 100 }, PartitionId = 1, Topic = TestTopic };
_fakeConn1.FetchResponseFunction = () => { Thread.Sleep(500); return null; };

_mockKafkaConnectionFactory = _kernel.GetMock<IKafkaConnectionFactory>();
_mockKafkaConnectionFactory.Setup(x => x.Create(It.Is<KafkaEndpoint>(e => e.Endpoint.Port == 1), It.IsAny<TimeSpan>(), It.IsAny<IKafkaLog>(), null)).Returns(() => _fakeConn0);
_mockKafkaConnectionFactory.Setup(x => x.Create(It.Is<KafkaEndpoint>(e => e.Endpoint.Port == 2), It.IsAny<TimeSpan>(), It.IsAny<IKafkaLog>(), null)).Returns(() => _fakeConn1);
_mockKafkaConnectionFactory.Setup(x => x.Create(It.Is<KafkaEndpoint>(e => e.Endpoint.Port == 1), It.IsAny<TimeSpan>(), It.IsAny<IKafkaLog>(), null, null)).Returns(() => _fakeConn0);
_mockKafkaConnectionFactory.Setup(x => x.Create(It.Is<KafkaEndpoint>(e => e.Endpoint.Port == 2), It.IsAny<TimeSpan>(), It.IsAny<IKafkaLog>(), null, null)).Returns(() => _fakeConn1);
_mockKafkaConnectionFactory.Setup(x => x.Resolve(It.IsAny<Uri>(), It.IsAny<IKafkaLog>()))
.Returns<Uri, IKafkaLog>((uri, log) => new KafkaEndpoint
{
Expand Down
24 changes: 24 additions & 0 deletions src/kafka-tests/Fakes/FakeStreamDecorator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.IO;
using KafkaNet;

namespace kafka_tests.Fakes
{
public class FakeStreamDecorator:IStreamDecorator
{
public int CalledCount { get; set; }

public void Reset()
{
CalledCount = 0;
}

public Type WrapStreamType { get; private set; }

public Stream WrapStream(Stream stream)
{
CalledCount++;
return stream;
}
}
}
3 changes: 2 additions & 1 deletion src/kafka-tests/Unit/AsyncLockTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ public void AsyncLockShouldUnlockEvenFromDifferentThreads()
Assert.That(count, Is.EqualTo(1), "Only one task should have gotten past lock.");

block.Release();
TaskTest.WaitFor(() => count > 1);
//NOTE: this test is related to a timing issue of the test itself. need to give more time on the WaitFor to ensure Increment can happen
TaskTest.WaitFor(() => count > 1, 6000);
Assert.That(count, Is.EqualTo(2), "Second call should get past lock.");
}

Expand Down
4 changes: 2 additions & 2 deletions src/kafka-tests/Unit/BrokerRouterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void Setup()
_mockPartitionSelector = _kernel.GetMock<IPartitionSelector>();
_mockKafkaConnection1 = _kernel.GetMock<IKafkaConnection>();
_mockKafkaConnectionFactory = _kernel.GetMock<IKafkaConnectionFactory>();
_mockKafkaConnectionFactory.Setup(x => x.Create(It.Is<KafkaEndpoint>(e => e.Endpoint.Port == 1), It.IsAny<TimeSpan>(), It.IsAny<IKafkaLog>(), null)).Returns(() => _mockKafkaConnection1.Object);
_mockKafkaConnectionFactory.Setup(x => x.Create(It.Is<KafkaEndpoint>(e => e.Endpoint.Port == 1), It.IsAny<TimeSpan>(), It.IsAny<IKafkaLog>(), null, null)).Returns(() => _mockKafkaConnection1.Object);
_mockKafkaConnectionFactory.Setup(x => x.Resolve(It.IsAny<Uri>(), It.IsAny<IKafkaLog>()))
.Returns<Uri, IKafkaLog>((uri, log) => new KafkaEndpoint
{
Expand Down Expand Up @@ -85,7 +85,7 @@ public void BrokerRouterUsesFactoryToAddNewBrokers()
.Returns(() => Task.Run(() => new List<MetadataResponse> { CreateMetaResponse() }));

var topics = router.GetTopicMetadata(TestTopic);
_mockKafkaConnectionFactory.Verify(x => x.Create(It.Is<KafkaEndpoint>(e => e.Endpoint.Port == 2), It.IsAny<TimeSpan>(), It.IsAny<IKafkaLog>(), null), Times.Once());
_mockKafkaConnectionFactory.Verify(x => x.Create(It.Is<KafkaEndpoint>(e => e.Endpoint.Port == 2), It.IsAny<TimeSpan>(), It.IsAny<IKafkaLog>(), null,null), Times.Once());
}

#region MetadataRequest Tests...
Expand Down
27 changes: 27 additions & 0 deletions src/kafka-tests/Unit/KafkaTcpSocketTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,33 @@ public void WriteAndReadShouldBeAsynchronous()
}
}

[Test]
public void WriteAndReadShouldUseStreamDecoratorIfProvided()
{
var write = new List<int>();
var read = new List<int>();
var expected = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var decorator = new FakeStreamDecorator();
using (var server = new FakeTcpServer(FakeServerPort))
using (var test = new KafkaTcpSocket(new DefaultTraceLog(), _fakeServerUrl, streamDecorator: decorator))
{
server.OnBytesReceived += data => write.AddRange(data.Batch(4).Select(x => x.ToArray().ToInt32()));

var tasks = Enumerable.Range(1, 10)
.SelectMany(i => new[]
{
test.WriteAsync(i.ToBytes().ToPayload()),
test.ReadAsync(4).ContinueWith(t => read.Add(t.Result.ToInt32())),
server.SendDataAsync(i.ToBytes())
}).ToArray();

Task.WaitAll(tasks);
Assert.That(write.OrderBy(x => x), Is.EqualTo(expected));
Assert.That(read.OrderBy(x => x), Is.EqualTo(expected));
Assert.That(decorator.CalledCount, Is.EqualTo(1));
}
}

[Test]
public void WriteShouldHandleLargeVolumeSendAsynchronously()
{
Expand Down
2 changes: 1 addition & 1 deletion src/kafka-tests/Unit/ProducerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void ProducerShouldReportCorrectAmountOfAsyncRequests()
Assert.That(producer.AsyncCount, Is.EqualTo(1), "One async operation should be sending.");

semaphore.Release();
sendTask.Wait(TimeSpan.FromMilliseconds(500));
sendTask.Wait(TimeSpan.FromMilliseconds(1000));

Assert.That(sendTask.IsCompleted, Is.True, "Send task should be marked as completed.");
Assert.That(producer.AsyncCount, Is.EqualTo(0), "Async should now show zero count.");
Expand Down
1 change: 1 addition & 0 deletions src/kafka-tests/kafka-tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<Compile Include="Fakes\FakeBrokerRouter.cs" />
<Compile Include="Fakes\BrokerRouterProxy.cs" />
<Compile Include="Fakes\FakeKafkaConnection.cs" />
<Compile Include="Fakes\FakeStreamDecorator.cs" />
<Compile Include="Fakes\FakeTcpServer.cs" />
<Compile Include="Helpers\IntegrationConfig.cs" />
<Compile Include="Helpers\MessageHelper.cs" />
Expand Down