diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md
index 264c0d52b..5eb4aae41 100644
--- a/docs/ReleaseNotes.md
+++ b/docs/ReleaseNotes.md
@@ -9,6 +9,8 @@ Current package versions:
## Unreleased
No pending unreleased changes
+- Add support for sharded pub/sub via `RedisChannel.Sharded` - ([#2887 by vandyvilla, atakavci and mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2887))
+
## 2.8.37
- Add `ConfigurationOptions.SetUserPemCertificate(...)` and `ConfigurationOptions.SetUserPfxCertificate(...)` methods to simplify using client certificates ([#2873 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2873))
diff --git a/docs/Timeouts.md b/docs/Timeouts.md
index 1c4ac3756..ea9830041 100644
--- a/docs/Timeouts.md
+++ b/docs/Timeouts.md
@@ -88,7 +88,7 @@ By default Redis Timeout exception(s) includes useful information, which can hel
|qs | Queue-Awaiting-Response : {int}|There are x operations currently awaiting replies from redis server.|
|aw | Active-Writer: {bool}||
|bw | Backlog-Writer: {enum} | Possible values are Inactive, Started, CheckingForWork, CheckingForTimeout, RecordingTimeout, WritingMessage, Flushing, MarkingInactive, RecordingWriteFailure, RecordingFault, SettingIdle, SpinningDown, Faulted|
-|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
+|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubSMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
|ws | Write-State: {enum}| Possible values are Initializing, Idle, Writing, Flushing, Flushed, NA|
|in | Inbound-Bytes : {long}|there are x bytes waiting to be read from the input stream from redis|
|in-pipe | Inbound-Pipe-Bytes: {long}|Bytes waiting to be read|
diff --git a/src/StackExchange.Redis/ClientInfo.cs b/src/StackExchange.Redis/ClientInfo.cs
index f04058495..c5ce0d0bf 100644
--- a/src/StackExchange.Redis/ClientInfo.cs
+++ b/src/StackExchange.Redis/ClientInfo.cs
@@ -129,10 +129,15 @@ public sealed class ClientInfo
public string? Name { get; private set; }
///
- /// Number of pattern matching subscriptions.
+ /// Number of pattern-matching subscriptions.
///
public int PatternSubscriptionCount { get; private set; }
+ ///
+ /// Number of sharded subscriptions.
+ ///
+ public int ShardedSubscriptionCount { get; private set; }
+
///
/// The port of the client.
///
@@ -236,6 +241,7 @@ internal static bool TryParse(string? input, [NotNullWhen(true)] out ClientInfo[
case "name": client.Name = value; break;
case "sub": client.SubscriptionCount = Format.ParseInt32(value); break;
case "psub": client.PatternSubscriptionCount = Format.ParseInt32(value); break;
+ case "ssub": client.ShardedSubscriptionCount = Format.ParseInt32(value); break;
case "multi": client.TransactionCommandLength = Format.ParseInt32(value); break;
case "cmd": client.LastCommand = value; break;
case "flags":
diff --git a/src/StackExchange.Redis/CommandMap.cs b/src/StackExchange.Redis/CommandMap.cs
index d1e125bb3..31974cab9 100644
--- a/src/StackExchange.Redis/CommandMap.cs
+++ b/src/StackExchange.Redis/CommandMap.cs
@@ -31,7 +31,7 @@ public sealed class CommandMap
RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!
- RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
+ RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,
RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,
@@ -57,7 +57,9 @@ public sealed class CommandMap
RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!
- RedisCommand.PSUBSCRIBE, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
+ RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,
+
+ RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,
RedisCommand.SCRIPT,
diff --git a/src/StackExchange.Redis/Enums/RedisCommand.cs b/src/StackExchange.Redis/Enums/RedisCommand.cs
index a4647d7eb..3909be4c2 100644
--- a/src/StackExchange.Redis/Enums/RedisCommand.cs
+++ b/src/StackExchange.Redis/Enums/RedisCommand.cs
@@ -181,6 +181,7 @@ internal enum RedisCommand
SORT,
SORT_RO,
SPOP,
+ SPUBLISH,
SRANDMEMBER,
SREM,
STRLEN,
@@ -188,6 +189,8 @@ internal enum RedisCommand
SUNION,
SUNIONSTORE,
SSCAN,
+ SSUBSCRIBE,
+ SUNSUBSCRIBE,
SWAPDB,
SYNC,
@@ -447,10 +450,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.SMEMBERS:
case RedisCommand.SMISMEMBER:
case RedisCommand.SORT_RO:
+ case RedisCommand.SPUBLISH:
case RedisCommand.SRANDMEMBER:
+ case RedisCommand.SSUBSCRIBE:
case RedisCommand.STRLEN:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SUNION:
+ case RedisCommand.SUNSUBSCRIBE:
case RedisCommand.SSCAN:
case RedisCommand.SYNC:
case RedisCommand.TIME:
diff --git a/src/StackExchange.Redis/Interfaces/ISubscriber.cs b/src/StackExchange.Redis/Interfaces/ISubscriber.cs
index e0c509f49..a9c0bf298 100644
--- a/src/StackExchange.Redis/Interfaces/ISubscriber.cs
+++ b/src/StackExchange.Redis/Interfaces/ISubscriber.cs
@@ -110,6 +110,7 @@ public interface ISubscriber : IRedis
/// See
/// ,
/// .
+ /// .
///
void UnsubscribeAll(CommandFlags flags = CommandFlags.None);
diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs
index b89a6b946..fd75585a5 100644
--- a/src/StackExchange.Redis/Message.cs
+++ b/src/StackExchange.Redis/Message.cs
@@ -569,6 +569,9 @@ internal static bool RequiresDatabase(RedisCommand command)
case RedisCommand.SLAVEOF:
case RedisCommand.SLOWLOG:
case RedisCommand.SUBSCRIBE:
+ case RedisCommand.SPUBLISH:
+ case RedisCommand.SSUBSCRIBE:
+ case RedisCommand.SUNSUBSCRIBE:
case RedisCommand.SWAPDB:
case RedisCommand.SYNC:
case RedisCommand.TIME:
diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs
index a10b241cb..f5d75c188 100644
--- a/src/StackExchange.Redis/PhysicalBridge.cs
+++ b/src/StackExchange.Redis/PhysicalBridge.cs
@@ -124,9 +124,12 @@ public enum State : byte
public RedisCommand LastCommand { get; private set; }
///
- /// If we have a connection, report the protocol being used.
+ /// If we have (or had) a connection, report the protocol being used.
///
- public RedisProtocol? Protocol => physical?.Protocol;
+ /// The value remains after disconnect, so that appropriate follow-up actions (pub/sub etc) can work reliably.
+ public RedisProtocol? Protocol => _protocol == 0 ? default(RedisProtocol?) : _protocol;
+ private RedisProtocol _protocol; // note starts at zero, not RESP2
+ internal void SetProtocol(RedisProtocol protocol) => _protocol = protocol;
public void Dispose()
{
diff --git a/src/StackExchange.Redis/PhysicalConnection.cs b/src/StackExchange.Redis/PhysicalConnection.cs
index 51fac7c3d..c92290696 100644
--- a/src/StackExchange.Redis/PhysicalConnection.cs
+++ b/src/StackExchange.Redis/PhysicalConnection.cs
@@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable
private const int DefaultRedisDatabaseCount = 16;
- private static readonly CommandBytes message = "message", pmessage = "pmessage";
+ private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
@@ -276,7 +276,11 @@ private enum ReadMode : byte
private RedisProtocol _protocol; // note starts at **zero**, not RESP2
public RedisProtocol? Protocol => _protocol == 0 ? null : _protocol;
- internal void SetProtocol(RedisProtocol value) => _protocol = value;
+ internal void SetProtocol(RedisProtocol value)
+ {
+ _protocol = value;
+ BridgeCouldBeNull?.SetProtocol(value);
+ }
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")]
internal void Shutdown()
@@ -384,7 +388,7 @@ public void RecordConnectionFailed(
bool isInitialConnect = false,
IDuplexPipe? connectingPipe = null)
{
- bool weAskedForThis = false;
+ bool weAskedForThis;
Exception? outerException = innerException;
IdentifyFailureType(innerException, ref failureType);
var bridge = BridgeCouldBeNull;
@@ -1644,9 +1648,9 @@ private void MatchResult(in RawResult result)
// out of band message does not match to a queued message
var items = result.GetItems();
- if (items.Length >= 3 && items[0].IsEqual(message))
+ if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
{
- _readStatus = ReadStatus.PubSubMessage;
+ _readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel;
@@ -1668,8 +1672,17 @@ private void MatchResult(in RawResult result)
}
// invoke the handlers
- var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
- Trace("MESSAGE: " + channel);
+ RedisChannel channel;
+ if (items[0].IsEqual(message))
+ {
+ channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
+ Trace("MESSAGE: " + channel);
+ }
+ else // see check on outer-if that restricts to message / smessage
+ {
+ channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
+ Trace("SMESSAGE: " + channel);
+ }
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[2], out var payload))
@@ -1690,19 +1703,22 @@ private void MatchResult(in RawResult result)
{
_readStatus = ReadStatus.PubSubPMessage;
- var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
+ var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
+
Trace("PMESSAGE: " + channel);
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[3], out var payload))
{
- var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
+ var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
+
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payload);
}
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
{
- var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
+ var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
+
_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payloads);
}
@@ -1710,7 +1726,7 @@ private void MatchResult(in RawResult result)
return; // AND STOP PROCESSING!
}
- // if it didn't look like "[p]message", then we still need to process the pending queue
+ // if it didn't look like "[p|s]message", then we still need to process the pending queue
}
Trace("Matching result...");
@@ -2110,6 +2126,7 @@ internal enum ReadStatus
MatchResult,
PubSubMessage,
PubSubPMessage,
+ PubSubSMessage,
Reconfigure,
InvokePubSub,
ResponseSequenceCheck, // high-integrity mode only
diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
index 8263defd3..683332070 100644
--- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
+++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
@@ -1309,6 +1309,7 @@ StackExchange.Redis.RedisChannel
StackExchange.Redis.RedisChannel.Equals(StackExchange.Redis.RedisChannel other) -> bool
StackExchange.Redis.RedisChannel.IsNullOrEmpty.get -> bool
StackExchange.Redis.RedisChannel.IsPattern.get -> bool
+StackExchange.Redis.RedisChannel.IsSharded.get -> bool
StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode
@@ -1893,4 +1894,8 @@ virtual StackExchange.Redis.RedisResult.Length.get -> int
virtual StackExchange.Redis.RedisResult.this[int index].get -> StackExchange.Redis.RedisResult!
StackExchange.Redis.ConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
StackExchange.Redis.IConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
+StackExchange.Redis.RedisFeatures.ShardedPubSub.get -> bool
+static StackExchange.Redis.RedisChannel.Sharded(byte[]? value) -> StackExchange.Redis.RedisChannel
+static StackExchange.Redis.RedisChannel.Sharded(string! value) -> StackExchange.Redis.RedisChannel
+StackExchange.Redis.ClientInfo.ShardedSubscriptionCount.get -> int
StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCertificatePath, string? password = null) -> void
diff --git a/src/StackExchange.Redis/RawResult.cs b/src/StackExchange.Redis/RawResult.cs
index 300503f57..55c44652b 100644
--- a/src/StackExchange.Redis/RawResult.cs
+++ b/src/StackExchange.Redis/RawResult.cs
@@ -161,7 +161,7 @@ public bool MoveNext()
}
public ReadOnlySequence Current { get; private set; }
}
- internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode)
+ internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.RedisChannelOptions options)
{
switch (Resp2TypeBulkString)
{
@@ -169,12 +169,13 @@ internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.Pattern
case ResultType.BulkString:
if (channelPrefix == null)
{
- return new RedisChannel(GetBlob(), mode);
+ return new RedisChannel(GetBlob(), options);
}
if (StartsWith(channelPrefix))
{
byte[] copy = Payload.Slice(channelPrefix.Length).ToArray();
- return new RedisChannel(copy, mode);
+
+ return new RedisChannel(copy, options);
}
return default;
default:
diff --git a/src/StackExchange.Redis/RedisChannel.cs b/src/StackExchange.Redis/RedisChannel.cs
index 561cce21f..f6debd1eb 100644
--- a/src/StackExchange.Redis/RedisChannel.cs
+++ b/src/StackExchange.Redis/RedisChannel.cs
@@ -9,7 +9,18 @@ namespace StackExchange.Redis
public readonly struct RedisChannel : IEquatable
{
internal readonly byte[]? Value;
- internal readonly bool _isPatternBased;
+
+ internal readonly RedisChannelOptions Options;
+
+ [Flags]
+ internal enum RedisChannelOptions
+ {
+ None = 0,
+ Pattern = 1 << 0,
+ Sharded = 1 << 1,
+ }
+
+ internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;
///
/// Indicates whether the channel-name is either null or a zero-length value.
@@ -19,7 +30,12 @@ namespace StackExchange.Redis
///
/// Indicates whether this channel represents a wildcard pattern (see PSUBSCRIBE).
///
- public bool IsPattern => _isPatternBased;
+ public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0;
+
+ ///
+ /// Indicates whether this channel represents a shard channel (see SSUBSCRIBE).
+ ///
+ public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0;
internal bool IsNull => Value == null;
@@ -59,19 +75,35 @@ public static bool UseImplicitAutoPattern
///
/// The name of the channel to create.
/// The mode for name matching.
- public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { }
+ public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None)
+ {
+ }
///
/// Create a new redis channel from a string, explicitly controlling the pattern mode.
///
/// The string name of the channel to create.
/// The mode for name matching.
- public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { }
+ public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
+ {
+ }
+
+ ///
+ /// Create a new redis channel from a buffer, representing a sharded channel.
+ ///
+ /// The name of the channel to create.
+ public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);
+
+ ///
+ /// Create a new redis channel from a string, representing a sharded channel.
+ ///
+ /// The string name of the channel to create.
+ public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);
- private RedisChannel(byte[]? value, bool isPatternBased)
+ internal RedisChannel(byte[]? value, RedisChannelOptions options)
{
Value = value;
- _isPatternBased = isPatternBased;
+ Options = options;
}
private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
@@ -123,7 +155,7 @@ private RedisChannel(byte[]? value, bool isPatternBased)
/// The first to compare.
/// The second to compare.
public static bool operator ==(RedisChannel x, RedisChannel y) =>
- x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value);
+ x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);
///
/// Indicate whether two channel names are equal.
@@ -171,10 +203,10 @@ private RedisChannel(byte[]? value, bool isPatternBased)
/// Indicate whether two channel names are equal.
///
/// The to compare to.
- public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value);
+ public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);
///
- public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0);
+ public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;
///
/// Obtains a string representation of the channel name.
@@ -203,7 +235,7 @@ internal RedisChannel Clone()
return this;
}
var copy = (byte[])Value.Clone(); // defensive array copy
- return new RedisChannel(copy, _isPatternBased);
+ return new RedisChannel(copy, Options);
}
///
diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs
index 7468bdb64..716176662 100644
--- a/src/StackExchange.Redis/RedisDatabase.cs
+++ b/src/StackExchange.Redis/RedisDatabase.cs
@@ -1575,14 +1575,14 @@ public Task StringLongestCommonSubsequenceWithMatchesAsync(Redis
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
- var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
+ var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
if (channel.IsNullOrEmpty) throw new ArgumentNullException(nameof(channel));
- var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
+ var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
}
diff --git a/src/StackExchange.Redis/RedisFeatures.cs b/src/StackExchange.Redis/RedisFeatures.cs
index 225516433..faba07e68 100644
--- a/src/StackExchange.Redis/RedisFeatures.cs
+++ b/src/StackExchange.Redis/RedisFeatures.cs
@@ -186,6 +186,11 @@ public RedisFeatures(Version version)
///
public bool SetVaradicAddRemove => Version.IsAtLeast(v2_4_0);
+ ///
+ /// Are SSUBSCRIBE and SPUBLISH available?
+ ///
+ public bool ShardedPubSub => Version.IsAtLeast(v7_0_0_rc1);
+
///
/// Are ZPOPMIN and ZPOPMAX available?
///
diff --git a/src/StackExchange.Redis/RedisSubscriber.cs b/src/StackExchange.Redis/RedisSubscriber.cs
index ee28f4c56..b641baf05 100644
--- a/src/StackExchange.Redis/RedisSubscriber.cs
+++ b/src/StackExchange.Redis/RedisSubscriber.cs
@@ -182,15 +182,25 @@ public Subscription(CommandFlags flags)
///
internal Message GetMessage(RedisChannel channel, SubscriptionAction action, CommandFlags flags, bool internalCall)
{
- var isPattern = channel._isPatternBased;
+ var isPattern = channel.IsPattern;
+ var isSharded = channel.IsSharded;
var command = action switch
{
- SubscriptionAction.Subscribe when isPattern => RedisCommand.PSUBSCRIBE,
- SubscriptionAction.Unsubscribe when isPattern => RedisCommand.PUNSUBSCRIBE,
-
- SubscriptionAction.Subscribe when !isPattern => RedisCommand.SUBSCRIBE,
- SubscriptionAction.Unsubscribe when !isPattern => RedisCommand.UNSUBSCRIBE,
- _ => throw new ArgumentOutOfRangeException(nameof(action), "This would be an impressive boolean feat"),
+ SubscriptionAction.Subscribe => channel.Options switch
+ {
+ RedisChannel.RedisChannelOptions.None => RedisCommand.SUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SSUBSCRIBE,
+ _ => Unknown(action, channel.Options),
+ },
+ SubscriptionAction.Unsubscribe => channel.Options switch
+ {
+ RedisChannel.RedisChannelOptions.None => RedisCommand.UNSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Pattern => RedisCommand.PUNSUBSCRIBE,
+ RedisChannel.RedisChannelOptions.Sharded => RedisCommand.SUNSUBSCRIBE,
+ _ => Unknown(action, channel.Options),
+ },
+ _ => Unknown(action, channel.Options),
};
// TODO: Consider flags here - we need to pass Fire and Forget, but don't want to intermingle Primary/Replica
@@ -203,6 +213,9 @@ internal Message GetMessage(RedisChannel channel, SubscriptionAction action, Com
return msg;
}
+ private RedisCommand Unknown(SubscriptionAction action, RedisChannel.RedisChannelOptions options)
+ => throw new ArgumentException($"Unable to determine pub/sub operation for '{action}' against '{options}'");
+
public void Add(Action? handler, ChannelMessageQueue? queue)
{
if (handler != null)
@@ -370,14 +383,14 @@ private static void ThrowIfNull(in RedisChannel channel)
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
- var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
+ var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteSync(msg, ResultProcessor.Int64);
}
public Task PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
{
ThrowIfNull(channel);
- var msg = Message.Create(-1, flags, RedisCommand.PUBLISH, channel, message);
+ var msg = Message.Create(-1, flags, channel.PublishCommand, channel, message);
return ExecuteAsync(msg, ResultProcessor.Int64);
}
@@ -515,6 +528,7 @@ private bool UnregisterSubscription(in RedisChannel channel, Action
public static readonly ResultProcessor PersistResultArray = new PersistResultArrayProcessor();
public static readonly ResultProcessor
- RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.PatternMode.Literal);
+ RedisChannelArrayLiteral = new RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions.None);
public static readonly ResultProcessor
RedisKey = new RedisKeyProcessor();
@@ -354,8 +354,7 @@ public bool TryParse(in RawResult result, out TimeSpan? expiry)
switch (result.Resp2TypeBulkString)
{
case ResultType.Integer:
- long time;
- if (result.TryGetInt64(out time))
+ if (result.TryGetInt64(out long time))
{
if (time < 0)
{
@@ -469,7 +468,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
var newServer = message.Command switch
{
- RedisCommand.SUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint,
+ RedisCommand.SUBSCRIBE or RedisCommand.SSUBSCRIBE or RedisCommand.PSUBSCRIBE => connection.BridgeCouldBeNull?.ServerEndPoint,
_ => null,
};
Subscription?.SetCurrentServer(newServer);
@@ -1253,8 +1252,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
switch (result.Resp2TypeBulkString)
{
case ResultType.Integer:
- long i64;
- if (result.TryGetInt64(out i64))
+ if (result.TryGetInt64(out long i64))
{
SetResult(message, i64);
return true;
@@ -1262,8 +1260,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
break;
case ResultType.SimpleString:
case ResultType.BulkString:
- double val;
- if (result.TryGetDouble(out val))
+ if (result.TryGetDouble(out double val))
{
SetResult(message, val);
return true;
@@ -1366,8 +1363,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
case ResultType.Integer:
case ResultType.SimpleString:
case ResultType.BulkString:
- long i64;
- if (result.TryGetInt64(out i64))
+ if (result.TryGetInt64(out long i64))
{
SetResult(message, i64);
return true;
@@ -1423,8 +1419,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
SetResult(message, null);
return true;
}
- double val;
- if (result.TryGetDouble(out val))
+ if (result.TryGetDouble(out double val))
{
SetResult(message, val);
return true;
@@ -1449,8 +1444,7 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
SetResult(message, null);
return true;
}
- long i64;
- if (result.TryGetInt64(out i64))
+ if (result.TryGetInt64(out long i64))
{
SetResult(message, i64);
return true;
@@ -1504,20 +1498,20 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
private sealed class RedisChannelArrayProcessor : ResultProcessor
{
- private readonly RedisChannel.PatternMode mode;
- public RedisChannelArrayProcessor(RedisChannel.PatternMode mode)
+ private readonly RedisChannel.RedisChannelOptions options;
+ public RedisChannelArrayProcessor(RedisChannel.RedisChannelOptions options)
{
- this.mode = mode;
+ this.options = options;
}
private readonly struct ChannelState // I would use a value-tuple here, but that is binding hell
{
public readonly byte[]? Prefix;
- public readonly RedisChannel.PatternMode Mode;
- public ChannelState(byte[]? prefix, RedisChannel.PatternMode mode)
+ public readonly RedisChannel.RedisChannelOptions Options;
+ public ChannelState(byte[]? prefix, RedisChannel.RedisChannelOptions options)
{
Prefix = prefix;
- Mode = mode;
+ Options = options;
}
}
protected override bool SetResultCore(PhysicalConnection connection, Message message, in RawResult result)
@@ -1526,8 +1520,8 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
{
case ResultType.Array:
var final = result.ToArray(
- (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Mode),
- new ChannelState(connection.ChannelPrefix, mode))!;
+ (in RawResult item, in ChannelState state) => item.AsRedisChannel(state.Prefix, state.Options),
+ new ChannelState(connection.ChannelPrefix, options))!;
SetResult(message, final);
return true;
@@ -2167,7 +2161,10 @@ private sealed class RedisStreamInterleavedProcessor : ValuePairInterleavedProce
protected override bool AllowJaggedPairs => false; // we only use this on a flattened map
public static readonly RedisStreamInterleavedProcessor Instance = new();
- private RedisStreamInterleavedProcessor() { }
+ private RedisStreamInterleavedProcessor()
+ {
+ }
+
protected override RedisStream Parse(in RawResult first, in RawResult second, object? state)
=> new(key: first.AsRedisKey(), entries: ((MultiStreamProcessor)state!).ParseRedisStreamEntries(second));
}
@@ -2549,7 +2546,10 @@ protected override bool SetResultCore(PhysicalConnection connection, Message mes
internal class StreamNameValueEntryProcessor : ValuePairInterleavedProcessorBase
{
public static readonly StreamNameValueEntryProcessor Instance = new();
- private StreamNameValueEntryProcessor() { }
+ private StreamNameValueEntryProcessor()
+ {
+ }
+
protected override NameValueEntry Parse(in RawResult first, in RawResult second, object? state)
=> new NameValueEntry(first.AsRedisValue(), second.AsRedisValue());
}
diff --git a/src/StackExchange.Redis/ServerEndPoint.cs b/src/StackExchange.Redis/ServerEndPoint.cs
index 8b099afd2..55fe6cff3 100644
--- a/src/StackExchange.Redis/ServerEndPoint.cs
+++ b/src/StackExchange.Redis/ServerEndPoint.cs
@@ -260,6 +260,8 @@ public void Dispose()
case RedisCommand.UNSUBSCRIBE:
case RedisCommand.PSUBSCRIBE:
case RedisCommand.PUNSUBSCRIBE:
+ case RedisCommand.SSUBSCRIBE:
+ case RedisCommand.SUNSUBSCRIBE:
message.SetForSubscriptionBridge();
break;
}
@@ -278,6 +280,8 @@ public void Dispose()
case RedisCommand.UNSUBSCRIBE:
case RedisCommand.PSUBSCRIBE:
case RedisCommand.PUNSUBSCRIBE:
+ case RedisCommand.SSUBSCRIBE:
+ case RedisCommand.SUNSUBSCRIBE:
if (!KnowOrAssumeResp3())
{
return subscription ?? (create ? subscription = CreateBridge(ConnectionType.Subscription, null) : null);
@@ -632,6 +636,10 @@ internal void OnDisconnected(PhysicalBridge bridge)
if (bridge == interactive)
{
CompletePendingConnectionMonitors("Disconnected");
+ if (Protocol is RedisProtocol.Resp3)
+ {
+ Multiplexer.UpdateSubscriptions();
+ }
}
else if (bridge == subscription)
{
diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs
index 742ce51bb..b1f9d01cd 100644
--- a/tests/StackExchange.Redis.Tests/ClusterTests.cs
+++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs
@@ -15,7 +15,9 @@ namespace StackExchange.Redis.Tests;
[Collection(SharedConnectionFixture.Key)]
public class ClusterTests : TestBase
{
- public ClusterTests(ITestOutputHelper output, SharedConnectionFixture fixture) : base(output, fixture) { }
+ public ClusterTests(ITestOutputHelper output, SharedConnectionFixture fixture) : base(output, fixture)
+ {
+ }
protected override string GetConfiguration() => TestConfig.Current.ClusterServersAndPorts + ",connectTimeout=10000";
@@ -746,4 +748,224 @@ public void ConnectIncludesSubscriber()
Assert.Equal(PhysicalBridge.State.ConnectedEstablished, server.SubscriptionConnectionState);
}
}
+
+ [Fact]
+ public async Task TestShardedPubsubSubscriberAgainstReconnects()
+ {
+ var channel = RedisChannel.Sharded(Me());
+ using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false, require: RedisFeatures.v7_0_0_rc1);
+ Assert.True(conn.IsConnected);
+ var db = conn.GetDatabase();
+ Assert.Equal(0, await db.PublishAsync(channel, "noClientReceivesThis"));
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+
+ var pubsub = conn.GetSubscriber();
+ List<(RedisChannel, RedisValue)> received = new();
+ var queue = await pubsub.SubscribeAsync(channel);
+ _ = Task.Run(async () =>
+ {
+ // use queue API to have control over order
+ await foreach (var item in queue)
+ {
+ lock (received)
+ {
+ if (item.Channel.IsSharded && item.Channel == channel) received.Add((item.Channel, item.Message));
+ }
+ }
+ });
+ Assert.Equal(1, conn.GetSubscriptionsCount());
+
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+ await db.PingAsync();
+
+ for (int i = 0; i < 5; i++)
+ {
+ // check we get a hit
+ Assert.Equal(1, await db.PublishAsync(channel, i.ToString()));
+ }
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+
+ // this is endpoint at index 1 which has the hashslot for "testShardChannel"
+ var server = conn.GetServer(conn.GetEndPoints()[1]);
+ server.SimulateConnectionFailure(SimulatedFailureType.All);
+ SetExpectedAmbientFailureCount(2);
+
+ await Task.Delay(4000);
+ for (int i = 0; i < 5; i++)
+ {
+ // check we get a hit
+ Assert.Equal(1, await db.PublishAsync(channel, i.ToString()));
+ }
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+
+ Assert.Equal(1, conn.GetSubscriptionsCount());
+ Assert.Equal(10, received.Count);
+ ClearAmbientFailures();
+ }
+
+ [Fact]
+ public async Task TestShardedPubsubSubscriberAgainsHashSlotMigration()
+ {
+ var channel = RedisChannel.Sharded(Me());
+ using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 3000, shared: false, require: RedisFeatures.v7_0_0_rc1);
+ Assert.True(conn.IsConnected);
+ var db = conn.GetDatabase();
+ Assert.Equal(0, await db.PublishAsync(channel, "noClientReceivesThis"));
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+
+ var pubsub = conn.GetSubscriber();
+ List<(RedisChannel, RedisValue)> received = new();
+ var queue = await pubsub.SubscribeAsync(channel);
+ _ = Task.Run(async () =>
+ {
+ // use queue API to have control over order
+ await foreach (var item in queue)
+ {
+ lock (received)
+ {
+ if (item.Channel.IsSharded && item.Channel == channel) received.Add((item.Channel, item.Message));
+ }
+ }
+ });
+ Assert.Equal(1, conn.GetSubscriptionsCount());
+
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+ await db.PingAsync();
+
+ for (int i = 0; i < 5; i++)
+ {
+ // check we get a hit
+ Assert.Equal(1, await db.PublishAsync(channel, i.ToString()));
+ }
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+
+ // lets migrate the slot for "testShardChannel" to another node
+ DoHashSlotMigration();
+
+ await Task.Delay(4000);
+ for (int i = 0; i < 5; i++)
+ {
+ // check we get a hit
+ Assert.Equal(1, await db.PublishAsync(channel, i.ToString()));
+ }
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+
+ Assert.Equal(1, conn.GetSubscriptionsCount());
+ Assert.Equal(10, received.Count);
+ RollbackHashSlotMigration();
+ ClearAmbientFailures();
+ }
+
+ private void DoHashSlotMigration()
+ {
+ MigrateSlotForTestShardChannel(false);
+ }
+ private void RollbackHashSlotMigration()
+ {
+ MigrateSlotForTestShardChannel(true);
+ }
+
+ private void MigrateSlotForTestShardChannel(bool rollback)
+ {
+ int hashSlotForTestShardChannel = 7177;
+ using var conn = Create(allowAdmin: true, keepAlive: 1, connectTimeout: 5000, shared: false);
+ var servers = conn.GetServers();
+ IServer? serverWithPort7000 = null;
+ IServer? serverWithPort7001 = null;
+
+ string nodeIdForPort7000 = "780813af558af81518e58e495d63b6e248e80adf";
+ string nodeIdForPort7001 = "ea828c6074663c8bd4e705d3e3024d9d1721ef3b";
+ foreach (var server in servers)
+ {
+ string id = server.Execute("CLUSTER", "MYID").ToString();
+ if (id == nodeIdForPort7000)
+ {
+ serverWithPort7000 = server;
+ }
+ if (id == nodeIdForPort7001)
+ {
+ serverWithPort7001 = server;
+ }
+ }
+
+ IServer fromServer, toServer;
+ string fromNode, toNode;
+ if (rollback)
+ {
+ fromServer = serverWithPort7000!;
+ fromNode = nodeIdForPort7000;
+ toServer = serverWithPort7001!;
+ toNode = nodeIdForPort7001;
+ }
+ else
+ {
+ fromServer = serverWithPort7001!;
+ fromNode = nodeIdForPort7001;
+ toServer = serverWithPort7000!;
+ toNode = nodeIdForPort7000;
+ }
+
+ Assert.Equal("OK", toServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "IMPORTING", fromNode).ToString());
+ Assert.Equal("OK", fromServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "MIGRATING", toNode).ToString());
+ Assert.Equal("OK", toServer.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "NODE", toNode).ToString());
+ Assert.Equal("OK", fromServer!.Execute("CLUSTER", "SETSLOT", hashSlotForTestShardChannel, "NODE", toNode).ToString());
+ }
+
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task ClusterPubSub(bool sharded)
+ {
+ var guid = Guid.NewGuid().ToString();
+ var channel = sharded ? RedisChannel.Sharded(guid) : RedisChannel.Literal(guid);
+ using var conn = Create(keepAlive: 1, connectTimeout: 3000, shared: false, require: sharded ? RedisFeatures.v7_0_0_rc1 : RedisFeatures.v2_0_0);
+ Assert.True(conn.IsConnected);
+
+ var pubsub = conn.GetSubscriber();
+ List<(RedisChannel, RedisValue)> received = new();
+ var queue = await pubsub.SubscribeAsync(channel);
+ _ = Task.Run(async () =>
+ {
+ // use queue API to have control over order
+ await foreach (var item in queue)
+ {
+ lock (received)
+ {
+ received.Add((item.Channel, item.Message));
+ }
+ }
+ });
+
+ var db = conn.GetDatabase();
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+ await db.PingAsync();
+ for (int i = 0; i < 10; i++)
+ {
+ // check we get a hit
+ Assert.Equal(1, await db.PublishAsync(channel, i.ToString()));
+ }
+ await Task.Delay(50); // let the sub settle (this isn't needed on RESP3, note)
+ await db.PingAsync();
+ await pubsub.UnsubscribeAsync(channel);
+
+ (RedisChannel Channel, RedisValue Value)[] snap;
+ lock (received)
+ {
+ snap = received.ToArray(); // in case of concurrency
+ }
+ Log("items received: {0}", snap.Length);
+ Assert.Equal(10, snap.Length);
+ // separate log and validate loop here simplifies debugging (ask me how I know!)
+ for (int i = 0; i < 10; i++)
+ {
+ var pair = snap[i];
+ Log("element {0}: {1}/{2}", i, pair.Channel, pair.Value);
+ }
+ for (int i = 0; i < 10; i++)
+ {
+ var pair = snap[i];
+ Assert.Equal(channel, pair.Channel);
+ Assert.Equal(i, pair.Value);
+ }
+ }
}
diff --git a/toys/StackExchange.Redis.Server/RedisRequest.cs b/toys/StackExchange.Redis.Server/RedisRequest.cs
index 54102815c..36d133bab 100644
--- a/toys/StackExchange.Redis.Server/RedisRequest.cs
+++ b/toys/StackExchange.Redis.Server/RedisRequest.cs
@@ -45,8 +45,8 @@ public int GetInt32(int index)
public RedisKey GetKey(int index) => _inner[index].AsRedisKey();
- public RedisChannel GetChannel(int index, RedisChannel.PatternMode mode)
- => _inner[index].AsRedisChannel(null, mode);
+ internal RedisChannel GetChannel(int index, RedisChannel.RedisChannelOptions options)
+ => _inner[index].AsRedisChannel(null, options);
internal bool TryGetCommandBytes(int i, out CommandBytes command)
{
diff --git a/toys/StackExchange.Redis.Server/RedisServer.cs b/toys/StackExchange.Redis.Server/RedisServer.cs
index 63efbfd1b..52728fd44 100644
--- a/toys/StackExchange.Redis.Server/RedisServer.cs
+++ b/toys/StackExchange.Redis.Server/RedisServer.cs
@@ -479,7 +479,7 @@ private TypedRedisValue SubscribeImpl(RedisClient client, RedisRequest request)
int index = 0;
request.TryGetCommandBytes(0, out var cmd);
var cmdString = TypedRedisValue.BulkString(cmd.ToArray());
- var mode = cmd[0] == (byte)'p' ? RedisChannel.PatternMode.Pattern : RedisChannel.PatternMode.Literal;
+ var mode = cmd[0] == (byte)'p' ? RedisChannel.RedisChannelOptions.Pattern : RedisChannel.RedisChannelOptions.None;
for (int i = 1; i < request.Count; i++)
{
var channel = request.GetChannel(i, mode);