diff --git a/deps/common/redismodule.h b/deps/common/redismodule.h index 2f3bb858..c1299145 100644 --- a/deps/common/redismodule.h +++ b/deps/common/redismodule.h @@ -309,6 +309,9 @@ typedef uint64_t RedisModuleTimerID; * Use RedisModule_GetModuleOptionsAll instead. */ #define _REDISMODULE_OPTIONS_FLAGS_NEXT (1<<4) +/* default set of items to flush in an execution unit */ +#define REDISMODULE_FLUSH_EXEC_UNIT_FLAG_DEFAULT (1<<0) + /* Definitions for RedisModule_SetCommandInfo. */ typedef enum { @@ -976,7 +979,7 @@ REDISMODULE_API int (*RedisModule_GetSelectedDb)(RedisModuleCtx *ctx) REDISMODUL REDISMODULE_API int (*RedisModule_SelectDb)(RedisModuleCtx *ctx, int newid) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KeyExists)(RedisModuleCtx *ctx, RedisModuleString *keyname) REDISMODULE_ATTR; REDISMODULE_API RedisModuleKey * (*RedisModule_OpenKey)(RedisModuleCtx *ctx, RedisModuleString *keyname, int mode) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetOpenKeyModesAll)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetOpenKeyModesAll)(void) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_CloseKey)(RedisModuleKey *kp) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KeyType)(RedisModuleKey *kp) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_ValueLength)(RedisModuleKey *kp) REDISMODULE_ATTR; @@ -986,6 +989,7 @@ REDISMODULE_API RedisModuleString * (*RedisModule_ListGet)(RedisModuleKey *key, REDISMODULE_API int (*RedisModule_ListSet)(RedisModuleKey *key, long index, RedisModuleString *value) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ListInsert)(RedisModuleKey *key, long index, RedisModuleString *value) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ListDelete)(RedisModuleKey *key, long index) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_FlushExecutionUnit)(RedisModuleCtx *ctx, unsigned int flags) REDISMODULE_ATTR; REDISMODULE_API RedisModuleCallReply * (*RedisModule_Call)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) REDISMODULE_ATTR; REDISMODULE_API const char * (*RedisModule_CallReplyProto)(RedisModuleCallReply *reply, size_t *len) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_FreeCallReply)(RedisModuleCallReply *reply) REDISMODULE_ATTR; @@ -1098,7 +1102,7 @@ REDISMODULE_API int (*RedisModule_SetClientNameById)(uint64_t id, RedisModuleStr REDISMODULE_API int (*RedisModule_PublishMessage)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_PublishMessageShard)(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetContextFlags)(RedisModuleCtx *ctx) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_AvoidReplicaTraffic)(void) REDISMODULE_ATTR; REDISMODULE_API void * (*RedisModule_PoolAlloc)(RedisModuleCtx *ctx, size_t bytes) REDISMODULE_ATTR; REDISMODULE_API RedisModuleType * (*RedisModule_CreateDataType)(RedisModuleCtx *ctx, const char *name, int encver, RedisModuleTypeMethods *typemethods) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ModuleTypeSetValue)(RedisModuleKey *key, RedisModuleType *mt, void *value) REDISMODULE_ATTR; @@ -1201,17 +1205,17 @@ REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeys)(Redi REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata, int flags) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString * (*RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx) REDISMODULE_ATTR; -REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)() REDISMODULE_ATTR; +REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)(void) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_ScanCursorRestart)(RedisModuleScanCursor *cursor) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cursor) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)() REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetContextFlagsAll)(void) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)(void) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)(void) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetServerVersion)(void) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)(void) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_Yield)(RedisModuleCtx *ctx, int flags, const char *busy_reply) REDISMODULE_ATTR; REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) REDISMODULE_ATTR; REDISMODULE_API void * (*RedisModule_BlockClientGetPrivateData)(RedisModuleBlockedClient *blocked_client) REDISMODULE_ATTR; @@ -1234,7 +1238,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR; -REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)() REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)(void) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_RegisterClusterMessageReceiver)(RedisModuleCtx *ctx, uint8_t type, RedisModuleClusterMessageReceiver callback) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_SendClusterMessage)(RedisModuleCtx *ctx, const char *target_id, uint8_t type, const char *msg, uint32_t len) REDISMODULE_ATTR; @@ -1263,7 +1267,7 @@ REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *use REDISMODULE_API void (*RedisModule_SendChildHeartbeat)(double progress) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR; -REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR; +REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)(void) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_MallocSize)(void* ptr) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_MallocUsableSize)(void *ptr) REDISMODULE_ATTR; REDISMODULE_API size_t (*RedisModule_MallocSizeString)(RedisModuleString* str) REDISMODULE_ATTR; @@ -1379,6 +1383,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(StringToStreamID); REDISMODULE_GET_API(Call); REDISMODULE_GET_API(CallReplyProto); + REDISMODULE_GET_API(FlushExecutionUnit); REDISMODULE_GET_API(FreeCallReply); REDISMODULE_GET_API(CallReplyInteger); REDISMODULE_GET_API(CallReplyDouble); diff --git a/src/raft.c b/src/raft.c index eaff9d04..5b5ce37b 100644 --- a/src/raft.c +++ b/src/raft.c @@ -448,6 +448,18 @@ RedisModuleCallReply *RaftExecuteCommandArray(RedisRaftCtx *rr, exitRedisModuleCall(); rr->entered_eval = old_entered_eval; + /* Process blocked clients immediately after an RM_Call(). + * + * e.g. If above RM_Call() does LPUSH to a list, normally, we can reply + * to a blocked client waiting on that list. Though, Redis will not + * serve blocked clients until the end of its event loop iteration. This + * is not deterministic and RedisRaft nodes might diverge. For this + * reason, we enforce blocking client handling after each RM_Call() to + * make it deterministic in the cluster. + */ + int ret = RedisModule_FlushExecutionUnit(ctx, REDISMODULE_FLUSH_EXEC_UNIT_FLAG_DEFAULT); + RedisModule_Assert(ret == 0); + if (RedisModule_CallReplyType(reply) != REDISMODULE_REPLY_PROMISE) { /* reply to client if we didn't block */ if (req) { diff --git a/tests/integration/test_blocking.py b/tests/integration/test_blocking.py index 09d274d5..f2db758f 100644 --- a/tests/integration/test_blocking.py +++ b/tests/integration/test_blocking.py @@ -381,8 +381,6 @@ def test_blocking_with_disconnect(cluster): assert val == [b'1'] -@pytest.mark.skip(reason="skipping, as promise/handler isn't atomic " - "and abort is not working as expected") def test_blocking_with_timeout_after_unblock(cluster): cluster.create(3)