Skip to content

Support sharded pubsub commands #2887

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 10, 2025
2 changes: 2 additions & 0 deletions docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Current package versions:
## Unreleased
No pending unreleased changes

- Add support for sharded pub/sub via `RedisChannel.Sharded` - ([#2887 by vandyvilla, atakavci and mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2887))

## 2.8.37

- Add `ConfigurationOptions.SetUserPemCertificate(...)` and `ConfigurationOptions.SetUserPfxCertificate(...)` methods to simplify using client certificates ([#2873 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2873))
Expand Down
2 changes: 1 addition & 1 deletion docs/Timeouts.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ By default Redis Timeout exception(s) includes useful information, which can hel
|qs | Queue-Awaiting-Response : {int}|There are x operations currently awaiting replies from redis server.|
|aw | Active-Writer: {bool}||
|bw | Backlog-Writer: {enum} | Possible values are Inactive, Started, CheckingForWork, CheckingForTimeout, RecordingTimeout, WritingMessage, Flushing, MarkingInactive, RecordingWriteFailure, RecordingFault, SettingIdle, SpinningDown, Faulted|
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
|rs | Read-State: {enum}|Possible values are NotStarted, Init, RanToCompletion, Faulted, ReadSync, ReadAsync, UpdateWriteTime, ProcessBuffer, MarkProcessed, TryParseResult, MatchResult, PubSubMessage, PubSubSMessage, PubSubPMessage, Reconfigure, InvokePubSub, DequeueResult, ComputeResult, CompletePendingMessage, NA|
|ws | Write-State: {enum}| Possible values are Initializing, Idle, Writing, Flushing, Flushed, NA|
|in | Inbound-Bytes : {long}|there are x bytes waiting to be read from the input stream from redis|
|in-pipe | Inbound-Pipe-Bytes: {long}|Bytes waiting to be read|
Expand Down
8 changes: 7 additions & 1 deletion src/StackExchange.Redis/ClientInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,15 @@ public sealed class ClientInfo
public string? Name { get; private set; }

/// <summary>
/// Number of pattern matching subscriptions.
/// Number of pattern-matching subscriptions.
/// </summary>
public int PatternSubscriptionCount { get; private set; }

/// <summary>
/// Number of sharded subscriptions.
/// </summary>
public int ShardedSubscriptionCount { get; private set; }

/// <summary>
/// The port of the client.
/// </summary>
Expand Down Expand Up @@ -236,6 +241,7 @@ internal static bool TryParse(string? input, [NotNullWhen(true)] out ClientInfo[
case "name": client.Name = value; break;
case "sub": client.SubscriptionCount = Format.ParseInt32(value); break;
case "psub": client.PatternSubscriptionCount = Format.ParseInt32(value); break;
case "ssub": client.ShardedSubscriptionCount = Format.ParseInt32(value); break;
case "multi": client.TransactionCommandLength = Format.ParseInt32(value); break;
case "cmd": client.LastCommand = value; break;
case "flags":
Expand Down
6 changes: 4 additions & 2 deletions src/StackExchange.Redis/CommandMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public sealed class CommandMap

RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!

RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,

RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,

Expand All @@ -57,7 +57,9 @@ public sealed class CommandMap

RedisCommand.BLPOP, RedisCommand.BRPOP, RedisCommand.BRPOPLPUSH, // yeah, me neither!

RedisCommand.PSUBSCRIBE, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE,
RedisCommand.PSUBSCRIBE, RedisCommand.PUBLISH, RedisCommand.PUNSUBSCRIBE, RedisCommand.SUBSCRIBE, RedisCommand.UNSUBSCRIBE, RedisCommand.SPUBLISH, RedisCommand.SSUBSCRIBE, RedisCommand.SUNSUBSCRIBE,

RedisCommand.DISCARD, RedisCommand.EXEC, RedisCommand.MULTI, RedisCommand.UNWATCH, RedisCommand.WATCH,

RedisCommand.SCRIPT,

Expand Down
6 changes: 6 additions & 0 deletions src/StackExchange.Redis/Enums/RedisCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,16 @@ internal enum RedisCommand
SORT,
SORT_RO,
SPOP,
SPUBLISH,
SRANDMEMBER,
SREM,
STRLEN,
SUBSCRIBE,
SUNION,
SUNIONSTORE,
SSCAN,
SSUBSCRIBE,
SUNSUBSCRIBE,
SWAPDB,
SYNC,

Expand Down Expand Up @@ -447,10 +450,13 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
case RedisCommand.SMEMBERS:
case RedisCommand.SMISMEMBER:
case RedisCommand.SORT_RO:
case RedisCommand.SPUBLISH:
case RedisCommand.SRANDMEMBER:
case RedisCommand.SSUBSCRIBE:
case RedisCommand.STRLEN:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SUNION:
case RedisCommand.SUNSUBSCRIBE:
case RedisCommand.SSCAN:
case RedisCommand.SYNC:
case RedisCommand.TIME:
Expand Down
1 change: 1 addition & 0 deletions src/StackExchange.Redis/Interfaces/ISubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public interface ISubscriber : IRedis
/// See
/// <seealso href="https://redis.io/commands/unsubscribe"/>,
/// <seealso href="https://redis.io/commands/punsubscribe"/>.
/// <seealso href="https://redis.io/commands/sunsubscribe"/>.
/// </remarks>
void UnsubscribeAll(CommandFlags flags = CommandFlags.None);

Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@ internal static bool RequiresDatabase(RedisCommand command)
case RedisCommand.SLAVEOF:
case RedisCommand.SLOWLOG:
case RedisCommand.SUBSCRIBE:
case RedisCommand.SPUBLISH:
case RedisCommand.SSUBSCRIBE:
case RedisCommand.SUNSUBSCRIBE:
case RedisCommand.SWAPDB:
case RedisCommand.SYNC:
case RedisCommand.TIME:
Expand Down
7 changes: 5 additions & 2 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,12 @@ public enum State : byte
public RedisCommand LastCommand { get; private set; }

/// <summary>
/// If we have a connection, report the protocol being used.
/// If we have (or had) a connection, report the protocol being used.
/// </summary>
public RedisProtocol? Protocol => physical?.Protocol;
/// <remarks>The value remains after disconnect, so that appropriate follow-up actions (pub/sub etc) can work reliably.</remarks>
public RedisProtocol? Protocol => _protocol == 0 ? default(RedisProtocol?) : _protocol;
private RedisProtocol _protocol; // note starts at zero, not RESP2
internal void SetProtocol(RedisProtocol protocol) => _protocol = protocol;

public void Dispose()
{
Expand Down
39 changes: 28 additions & 11 deletions src/StackExchange.Redis/PhysicalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable

private const int DefaultRedisDatabaseCount = 16;

private static readonly CommandBytes message = "message", pmessage = "pmessage";
private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";

private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select(
i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray();
Expand Down Expand Up @@ -276,7 +276,11 @@ private enum ReadMode : byte
private RedisProtocol _protocol; // note starts at **zero**, not RESP2
public RedisProtocol? Protocol => _protocol == 0 ? null : _protocol;

internal void SetProtocol(RedisProtocol value) => _protocol = value;
internal void SetProtocol(RedisProtocol value)
{
_protocol = value;
BridgeCouldBeNull?.SetProtocol(value);
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")]
internal void Shutdown()
Expand Down Expand Up @@ -384,7 +388,7 @@ public void RecordConnectionFailed(
bool isInitialConnect = false,
IDuplexPipe? connectingPipe = null)
{
bool weAskedForThis = false;
bool weAskedForThis;
Exception? outerException = innerException;
IdentifyFailureType(innerException, ref failureType);
var bridge = BridgeCouldBeNull;
Expand Down Expand Up @@ -1644,9 +1648,9 @@ private void MatchResult(in RawResult result)

// out of band message does not match to a queued message
var items = result.GetItems();
if (items.Length >= 3 && items[0].IsEqual(message))
if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage)))
{
_readStatus = ReadStatus.PubSubMessage;
_readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;

// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry)
var configChanged = muxer.ConfigurationChangedChannel;
Expand All @@ -1668,8 +1672,17 @@ private void MatchResult(in RawResult result)
}

// invoke the handlers
var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
Trace("MESSAGE: " + channel);
RedisChannel channel;
if (items[0].IsEqual(message))
{
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None);
Trace("MESSAGE: " + channel);
}
else // see check on outer-if that restricts to message / smessage
{
channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded);
Trace("SMESSAGE: " + channel);
}
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[2], out var payload))
Expand All @@ -1690,27 +1703,30 @@ private void MatchResult(in RawResult result)
{
_readStatus = ReadStatus.PubSubPMessage;

var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal);
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

Trace("PMESSAGE: " + channel);
if (!channel.IsNull)
{
if (TryGetPubSubPayload(items[3], out var payload))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payload);
}
else if (TryGetMultiPubSubPayload(items[3], out var payloads))
{
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern);
var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);

_readStatus = ReadStatus.InvokePubSub;
muxer.OnMessage(sub, channel, payloads);
}
}
return; // AND STOP PROCESSING!
}

// if it didn't look like "[p]message", then we still need to process the pending queue
// if it didn't look like "[p|s]message", then we still need to process the pending queue
}
Trace("Matching result...");

Expand Down Expand Up @@ -2110,6 +2126,7 @@ internal enum ReadStatus
MatchResult,
PubSubMessage,
PubSubPMessage,
PubSubSMessage,
Reconfigure,
InvokePubSub,
ResponseSequenceCheck, // high-integrity mode only
Expand Down
5 changes: 5 additions & 0 deletions src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,7 @@ StackExchange.Redis.RedisChannel
StackExchange.Redis.RedisChannel.Equals(StackExchange.Redis.RedisChannel other) -> bool
StackExchange.Redis.RedisChannel.IsNullOrEmpty.get -> bool
StackExchange.Redis.RedisChannel.IsPattern.get -> bool
StackExchange.Redis.RedisChannel.IsSharded.get -> bool
StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Auto = 0 -> StackExchange.Redis.RedisChannel.PatternMode
StackExchange.Redis.RedisChannel.PatternMode.Literal = 1 -> StackExchange.Redis.RedisChannel.PatternMode
Expand Down Expand Up @@ -1893,4 +1894,8 @@ virtual StackExchange.Redis.RedisResult.Length.get -> int
virtual StackExchange.Redis.RedisResult.this[int index].get -> StackExchange.Redis.RedisResult!
StackExchange.Redis.ConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
StackExchange.Redis.IConnectionMultiplexer.AddLibraryNameSuffix(string! suffix) -> void
StackExchange.Redis.RedisFeatures.ShardedPubSub.get -> bool
static StackExchange.Redis.RedisChannel.Sharded(byte[]? value) -> StackExchange.Redis.RedisChannel
static StackExchange.Redis.RedisChannel.Sharded(string! value) -> StackExchange.Redis.RedisChannel
StackExchange.Redis.ClientInfo.ShardedSubscriptionCount.get -> int
StackExchange.Redis.ConfigurationOptions.SetUserPfxCertificate(string! userCertificatePath, string? password = null) -> void
7 changes: 4 additions & 3 deletions src/StackExchange.Redis/RawResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,21 @@ public bool MoveNext()
}
public ReadOnlySequence<byte> Current { get; private set; }
}
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.PatternMode mode)
internal RedisChannel AsRedisChannel(byte[]? channelPrefix, RedisChannel.RedisChannelOptions options)
{
switch (Resp2TypeBulkString)
{
case ResultType.SimpleString:
case ResultType.BulkString:
if (channelPrefix == null)
{
return new RedisChannel(GetBlob(), mode);
return new RedisChannel(GetBlob(), options);
}
if (StartsWith(channelPrefix))
{
byte[] copy = Payload.Slice(channelPrefix.Length).ToArray();
return new RedisChannel(copy, mode);

return new RedisChannel(copy, options);
}
return default;
default:
Expand Down
52 changes: 42 additions & 10 deletions src/StackExchange.Redis/RedisChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,18 @@ namespace StackExchange.Redis
public readonly struct RedisChannel : IEquatable<RedisChannel>
{
internal readonly byte[]? Value;
internal readonly bool _isPatternBased;

internal readonly RedisChannelOptions Options;

[Flags]
internal enum RedisChannelOptions
{
None = 0,
Pattern = 1 << 0,
Sharded = 1 << 1,
}

internal RedisCommand PublishCommand => IsSharded ? RedisCommand.SPUBLISH : RedisCommand.PUBLISH;

/// <summary>
/// Indicates whether the channel-name is either null or a zero-length value.
Expand All @@ -19,7 +30,12 @@ namespace StackExchange.Redis
/// <summary>
/// Indicates whether this channel represents a wildcard pattern (see <c>PSUBSCRIBE</c>).
/// </summary>
public bool IsPattern => _isPatternBased;
public bool IsPattern => (Options & RedisChannelOptions.Pattern) != 0;

/// <summary>
/// Indicates whether this channel represents a shard channel (see <c>SSUBSCRIBE</c>).
/// </summary>
public bool IsSharded => (Options & RedisChannelOptions.Sharded) != 0;

internal bool IsNull => Value == null;

Expand Down Expand Up @@ -59,19 +75,35 @@ public static bool UseImplicitAutoPattern
/// </summary>
/// <param name="value">The name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode)) { }
public RedisChannel(byte[]? value, PatternMode mode) : this(value, DeterminePatternBased(value, mode) ? RedisChannelOptions.Pattern : RedisChannelOptions.None)
{
}

/// <summary>
/// Create a new redis channel from a string, explicitly controlling the pattern mode.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
/// <param name="mode">The mode for name matching.</param>
public RedisChannel(string value, PatternMode mode) : this(value == null ? null : Encoding.UTF8.GetBytes(value), mode) { }
public RedisChannel(string value, PatternMode mode) : this(value is null ? null : Encoding.UTF8.GetBytes(value), mode)
{
}

/// <summary>
/// Create a new redis channel from a buffer, representing a sharded channel.
/// </summary>
/// <param name="value">The name of the channel to create.</param>
public static RedisChannel Sharded(byte[]? value) => new(value, RedisChannelOptions.Sharded);

/// <summary>
/// Create a new redis channel from a string, representing a sharded channel.
/// </summary>
/// <param name="value">The string name of the channel to create.</param>
public static RedisChannel Sharded(string value) => new(value is null ? null : Encoding.UTF8.GetBytes(value), RedisChannelOptions.Sharded);

private RedisChannel(byte[]? value, bool isPatternBased)
internal RedisChannel(byte[]? value, RedisChannelOptions options)
{
Value = value;
_isPatternBased = isPatternBased;
Options = options;
}

private static bool DeterminePatternBased(byte[]? value, PatternMode mode) => mode switch
Expand Down Expand Up @@ -123,7 +155,7 @@ private RedisChannel(byte[]? value, bool isPatternBased)
/// <param name="x">The first <see cref="RedisChannel"/> to compare.</param>
/// <param name="y">The second <see cref="RedisChannel"/> to compare.</param>
public static bool operator ==(RedisChannel x, RedisChannel y) =>
x._isPatternBased == y._isPatternBased && RedisValue.Equals(x.Value, y.Value);
x.Options == y.Options && RedisValue.Equals(x.Value, y.Value);

/// <summary>
/// Indicate whether two channel names are equal.
Expand Down Expand Up @@ -171,10 +203,10 @@ private RedisChannel(byte[]? value, bool isPatternBased)
/// Indicate whether two channel names are equal.
/// </summary>
/// <param name="other">The <see cref="RedisChannel"/> to compare to.</param>
public bool Equals(RedisChannel other) => _isPatternBased == other._isPatternBased && RedisValue.Equals(Value, other.Value);
public bool Equals(RedisChannel other) => Options == other.Options && RedisValue.Equals(Value, other.Value);

/// <inheritdoc/>
public override int GetHashCode() => RedisValue.GetHashCode(Value) + (_isPatternBased ? 1 : 0);
public override int GetHashCode() => RedisValue.GetHashCode(Value) ^ (int)Options;

/// <summary>
/// Obtains a string representation of the channel name.
Expand Down Expand Up @@ -203,7 +235,7 @@ internal RedisChannel Clone()
return this;
}
var copy = (byte[])Value.Clone(); // defensive array copy
return new RedisChannel(copy, _isPatternBased);
return new RedisChannel(copy, Options);
}

/// <summary>
Expand Down
Loading
Loading