Skip to content

Commit

Permalink
Enable Blocking a Client on Keyspace Event Notification
Browse files Browse the repository at this point in the history
This change enhances user experience and consistency by allowing a
module to block a client on keyspace event notifications. Consistency
is improved by ensuring that reads after writes on the same connection
yield expected results. For example, in ValkeySearch, mutations processed
earlier on the same connection will be available for search.

The implementation extends `VM_BlockClient` to support blocking clients on
keyspace event notifications. Internal clients, LUA clients, clients issueing
multi exec and those with the `deny_blocking` flag set are not blocked.
Once blocked, a client’s reply is withheld until it is explicitly unblocked.

Signed-off-by: yairgott <[email protected]>
  • Loading branch information
yairgott committed Mar 5, 2025
1 parent 3f6581b commit 79f435d
Show file tree
Hide file tree
Showing 24 changed files with 435 additions and 158 deletions.
8 changes: 4 additions & 4 deletions src/bitops.c
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ void setbitCommand(client *c) {
byteval |= ((on & 0x1) << bit);
((uint8_t *)o->ptr)[byte] = byteval;
signalModifiedKey(c, c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
server.dirty++;
}

Expand Down Expand Up @@ -877,11 +877,11 @@ void bitopCommand(client *c) {
if (maxlen) {
o = createObject(OBJ_STRING, res);
setKey(c, c->db, targetkey, &o, 0);
notifyKeyspaceEvent(NOTIFY_STRING, "set", targetkey, c->db->id);
notifyKeyspaceEvent(c, NOTIFY_STRING, "set", targetkey, c->db->id);
server.dirty++;
} else if (dbDelete(c->db, targetkey)) {
signalModifiedKey(c, c->db, targetkey);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", targetkey, c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", targetkey, c->db->id);
server.dirty++;
}
addReplyLongLong(c, maxlen); /* Return the output string length in bytes. */
Expand Down Expand Up @@ -1361,7 +1361,7 @@ void bitfieldGeneric(client *c, int flags) {

if (changes) {
signalModifiedKey(c, c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
server.dirty += changes;
}
zfree(ops);
Expand Down
6 changes: 3 additions & 3 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ void restoreCommand(client *c) {
robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
rewriteClientCommandVector(c, 2, aux, key);
signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", key, c->db->id);
server.dirty++;
}
decrRefCount(obj);
Expand All @@ -297,7 +297,7 @@ void restoreCommand(client *c) {
}
objectSetLRUOrLFU(obj, lfu_freq, lru_idle, lru_clock, 1000);
signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "restore", key, c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "restore", key, c->db->id);
addReply(c, shared.ok);
server.dirty++;
}
Expand Down Expand Up @@ -648,7 +648,7 @@ void migrateCommand(client *c) {
/* No COPY option: remove the local key, signal the change. */
dbDelete(c->db, kv[j]);
signalModifiedKey(c, c->db, kv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", kv[j], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", kv[j], c->db->id);
server.dirty++;

/* Populate the argument vector to replace the old one. */
Expand Down
2 changes: 1 addition & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6453,7 +6453,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
/* The keys are not actually logically deleted from the database, just moved to another node.
* The modules needs to know that these keys are no longer available locally, so just send the
* keyspace notification to the modules, but not to clients. */
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
moduleNotifyKeyspaceEvent(NULL, NOTIFY_GENERIC, "del", key, server.db[0].id);
exitExecutionUnit();
postExecutionUnitOperations();
decrRefCount(key);
Expand Down
20 changes: 10 additions & 10 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) {
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) server.stat_keyspace_hits++;
/* TODO: Use separate hits stats for WRITE */
} else {
if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE))) notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE))) notifyKeyspaceEvent(NULL, NOTIFY_KEY_MISS, "keymiss", key, db->id);
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) server.stat_keyspace_misses++;
/* TODO: Use separate misses stats and notify event for WRITE */
}
Expand Down Expand Up @@ -229,7 +229,7 @@ static void dbAddInternal(serverDb *db, robj *key, robj **valref, int update_if_
initObjectLRUOrLFU(val);
kvstoreHashtableAdd(db->keys, dict_index, val);
signalKeyAsReady(db, key, val->type);
notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id);
notifyKeyspaceEvent(NULL, NOTIFY_NEW, "new", key, db->id);
*valref = val;
}

Expand Down Expand Up @@ -827,7 +827,7 @@ void delGenericCommand(client *c, int lazy) {
int deleted = lazy ? dbAsyncDelete(c->db, c->argv[j]) : dbSyncDelete(c->db, c->argv[j]);
if (deleted) {
signalModifiedKey(c, c->db, c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[j], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", c->argv[j], c->db->id);
server.dirty++;
numdel++;
}
Expand Down Expand Up @@ -1409,8 +1409,8 @@ void renameGenericCommand(client *c, int nx) {
if (expire != -1) o = setExpire(c, c->db, c->argv[2], expire);
signalModifiedKey(c, c->db, c->argv[1]);
signalModifiedKey(c, c->db, c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "rename_from", c->argv[1], c->db->id);
notifyKeyspaceEvent(NOTIFY_GENERIC, "rename_to", c->argv[2], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "rename_from", c->argv[1], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "rename_to", c->argv[2], c->db->id);
server.dirty++;
addReply(c, nx ? shared.cone : shared.ok);
}
Expand Down Expand Up @@ -1477,8 +1477,8 @@ void moveCommand(client *c) {
/* OK! key moved */
signalModifiedKey(c, src, c->argv[1]);
signalModifiedKey(c, dst, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "move_from", c->argv[1], src->id);
notifyKeyspaceEvent(NOTIFY_GENERIC, "move_to", c->argv[1], dst->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "move_from", c->argv[1], src->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "move_to", c->argv[1], dst->id);

server.dirty++;
addReply(c, shared.cone);
Expand Down Expand Up @@ -1577,7 +1577,7 @@ void copyCommand(client *c) {

/* OK! key copied */
signalModifiedKey(c, dst, c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "copy_to", c->argv[2], dst->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "copy_to", c->argv[2], dst->id);

server.dirty++;
addReply(c, shared.cone);
Expand Down Expand Up @@ -1828,7 +1828,7 @@ void deleteExpiredKeyAndPropagateWithDictIndex(serverDb *db, robj *keyobj, int d
dbGenericDeleteWithDictIndex(db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED, dict_index);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del", expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, db->id);
notifyKeyspaceEvent(NULL, NOTIFY_EXPIRED, "expired", keyobj, db->id);
signalModifiedKey(NULL, db, keyobj);
propagateDeletion(db, keyobj, server.lazyfree_lazy_expire);
server.stat_expiredkeys++;
Expand All @@ -1850,7 +1850,7 @@ void deleteExpiredKeyFromOverwriteAndPropagate(client *c, robj *keyobj) {
robj *aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
rewriteClientCommandVector(c, 2, aux, keyobj);
signalModifiedKey(c, c->db, keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, c->db->id);
notifyKeyspaceEvent(NULL, NOTIFY_EXPIRED, "expired", keyobj, c->db->id);
server.stat_expiredkeys++;
}

Expand Down
2 changes: 1 addition & 1 deletion src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ int performEvictions(void) {
mem_freed += delta;
server.stat_evictedkeys++;
signalModifiedKey(NULL, db, keyobj);
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id);
notifyKeyspaceEvent(NULL, NOTIFY_EVICTED, "evicted", keyobj, db->id);
propagateDeletion(db, keyobj, server.lazyfree_lazy_eviction);
exitExecutionUnit();
postExecutionUnitOperations();
Expand Down
9 changes: 4 additions & 5 deletions src/expire.c
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,9 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
return;
} else {
obj = setExpire(c, c->db, key, when);
signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "expire", key, c->db->id);
server.dirty++;
addReply(c, shared.cone);
/* Propagate as PEXPIREAT millisecond-timestamp
* Only rewrite the command arg if not already PEXPIREAT */
Expand All @@ -695,10 +698,6 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
rewriteClientCommandArgument(c, 2, when_obj);
decrRefCount(when_obj);
}

signalModifiedKey(c, c->db, key);
notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
server.dirty++;
return;
}
}
Expand Down Expand Up @@ -772,7 +771,7 @@ void persistCommand(client *c) {
if (lookupKeyWrite(c->db, c->argv[1])) {
if (removeExpire(c->db, c->argv[1])) {
signalModifiedKey(c, c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "persist", c->argv[1], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "persist", c->argv[1], c->db->id);
addReply(c, shared.cone);
server.dirty++;
} else {
Expand Down
6 changes: 3 additions & 3 deletions src/geo.c
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
/* store key is not NULL, try to delete it and return 0. */
if (dbDelete(c->db, storekey)) {
signalModifiedKey(c, c->db, storekey);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", storekey, c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", storekey, c->db->id);
server.dirty++;
}
addReply(c, shared.czero);
Expand Down Expand Up @@ -781,12 +781,12 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
if (returned_items) {
zsetConvertToListpackIfNeeded(zobj, maxelelen, totelelen);
setKey(c, c->db, storekey, &zobj, 0);
notifyKeyspaceEvent(NOTIFY_ZSET, flags & GEOSEARCH ? "geosearchstore" : "georadiusstore", storekey,
notifyKeyspaceEvent(c, NOTIFY_ZSET, flags & GEOSEARCH ? "geosearchstore" : "georadiusstore", storekey,
c->db->id);
server.dirty += returned_items;
} else if (dbDelete(c->db, storekey)) {
signalModifiedKey(c, c->db, storekey);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", storekey, c->db->id);
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", storekey, c->db->id);
server.dirty++;
}
addReplyLongLong(c, returned_items);
Expand Down
4 changes: 2 additions & 2 deletions src/hyperloglog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,7 @@ void pfaddCommand(client *c) {
if (updated) {
HLL_INVALIDATE_CACHE(hdr);
signalModifiedKey(c, c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING, "pfadd", c->argv[1], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_STRING, "pfadd", c->argv[1], c->db->id);
server.dirty += updated;
}
addReply(c, updated ? shared.cone : shared.czero);
Expand Down Expand Up @@ -1642,7 +1642,7 @@ void pfmergeCommand(client *c) {
signalModifiedKey(c, c->db, c->argv[1]);
/* We generate a PFADD event for PFMERGE for semantical simplicity
* since in theory this is a mass-add of elements. */
notifyKeyspaceEvent(NOTIFY_STRING, "pfadd", c->argv[1], c->db->id);
notifyKeyspaceEvent(c, NOTIFY_STRING, "pfadd", c->argv[1], c->db->id);
server.dirty++;
addReply(c, shared.ok);
}
Expand Down
56 changes: 50 additions & 6 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ typedef struct ValkeyModuleCtx ValkeyModuleCtx;
context is destroyed */
#define VALKEYMODULE_CTX_CHANNELS_POS_REQUEST (1 << 8)
#define VALKEYMODULE_CTX_COMMAND (1 << 9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */

#define VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION (1<<10) /* Context created a keyspace notification event */

/* This represents a key opened with VM_OpenKey(). */
struct ValkeyModuleKey {
Expand Down Expand Up @@ -7807,8 +7807,24 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
void *privdata,
int flags) {
client *c = ctx->client;
if (c->flag.blocked || getClientType(c) != CLIENT_TYPE_NORMAL || c->flag.deny_blocking) {
/* Early return if duplicate block attempt or client is not normal or
* client is set to deny blocking. */
return NULL;
}

int islua = scriptIsRunning();
int ismulti = server.in_exec;
if (ctx->flags & (VALKEYMODULE_CTX_TEMP_CLIENT | VALKEYMODULE_CTX_NEW_CLIENT)) {
/* Temporary clients can't be blocked */
return NULL;
}
int iskeyspacenotification = ctx->flags & (VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION);
if ((islua || ismulti) && iskeyspacenotification) {
/* Avoid blocking within transactions when context initiated by
* keyspace notification. */
return NULL;
}
initClientBlockingState(c);

c->bstate->module_blocked_handle = zmalloc(sizeof(ValkeyModuleBlockedClient));
Expand Down Expand Up @@ -7864,6 +7880,11 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
c->bstate->timeout = timeout;
blockClient(c, BLOCKED_MODULE);
}
/* Defer response until after being unblocked for a contex originated from
* keyspace notification events */
if (iskeyspacenotification) {
c->flag.blocked_by_module = 1;
}
}
return bc;
}
Expand Down Expand Up @@ -8099,6 +8120,16 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
*
* 1. If the client is a Lua script.
* 2. If the client is executing a MULTI block.
* 3. If the client is a temporary module client.
* 4. If the client is already blocked.
*
* In cases 1 and 2, a call to VlakeyModule_BlockClient() will **not** block the
* client, but instead produce a specific error reply. Note that if the
* BlockClient call originated from within a keyspace notification, no error
* reply is generated and nullptr is returned.
*
* In case 3 and 4, a call to ValkeyModule_BlockClient() are no-op and
* return nullptr.
*
* In these cases, a call to ValkeyModule_BlockClient() will **not** block the
* client, but instead produce a specific error reply.
Expand Down Expand Up @@ -8300,6 +8331,10 @@ int moduleClientIsBlockedOnKeys(client *c) {
* ValkeyModule_BlockClientOnKeys() is accessible from the timeout
* callback via VM_GetBlockedClientPrivateData). */
int VM_UnblockClient(ValkeyModuleBlockedClient *bc, void *privdata) {
if (!bc){
/* No-op if the blocked client is null. */
return VALKEYMODULE_OK;
}
if (bc->blocked_on_keys) {
/* In theory the user should always pass the timeout handler as an
* argument, but better to be safe than sorry. */
Expand Down Expand Up @@ -8396,7 +8431,10 @@ void moduleHandleBlockedClients(void) {
* replies to send to the client in a thread safe context.
* We need to glue such replies to the client output buffer and
* free the temporary client we just used for the replies. */
if (c) AddReplyFromClient(c, bc->reply_client);
if (c) {
c->flag.blocked_by_module = 0;
AddReplyFromClient(c, bc->reply_client);
}
moduleReleaseTempClient(bc->reply_client);
moduleReleaseTempClient(bc->thread_safe_ctx_client);

Expand Down Expand Up @@ -8836,14 +8874,14 @@ int VM_GetNotifyKeyspaceEvents(void) {
/* Expose notifyKeyspaceEvent to modules */
int VM_NotifyKeyspaceEvent(ValkeyModuleCtx *ctx, int type, const char *event, ValkeyModuleString *key) {
if (!ctx || !ctx->client) return VALKEYMODULE_ERR;
notifyKeyspaceEvent(type, (char *)event, key, ctx->client->db->id);
notifyKeyspaceEvent(NULL, type, (char *)event, key, ctx->client->db->id);
return VALKEYMODULE_OK;
}

/* Dispatcher for keyspace notifications to module subscriber functions.
* This gets called only if at least one module requested to be notified on
* keyspace notifications */
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
void moduleNotifyKeyspaceEvent(client *c, int type, const char *event, robj *key, int dbid) {
/* Don't do anything if there aren't any subscribers */
if (listLength(moduleKeyspaceSubscribers) == 0) return;

Expand Down Expand Up @@ -8880,8 +8918,14 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
if ((sub->event_mask & type) &&
(sub->active == 0 || (sub->module->options & VALKEYMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS))) {
ValkeyModuleCtx ctx;
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, dbid);
if (c == NULL) {
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, dbid);
} else {
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_NONE);
ctx.client = c;
}
ctx.flags |= VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION;

/* mark the handler as active to avoid reentrant loops.
* If the subscriber performs an action triggering itself,
Expand Down
2 changes: 1 addition & 1 deletion src/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ size_t moduleCount(void);
void moduleAcquireGIL(void);
int moduleTryAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void moduleNotifyKeyspaceEvent(client *c, int type, const char *event, robj *key, int dbid);
void firePostExecutionUnitJobs(void);
void moduleCallCommandFilters(client *c);
void modulePostExecutionUnitOperations(void);
Expand Down
3 changes: 2 additions & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void putClientInPendingWriteQueue(client *c) {
int prepareClientToWrite(client *c) {
/* If it's the Lua client we always return ok without installing any
* handler since there is no socket at all. */
if (c->flag.script || c->flag.module) return C_OK;
if (c->flag.script || c->flag.module || c->flag.blocked_by_module) return C_OK;

/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flag.close_asap) return C_ERR;
Expand Down Expand Up @@ -2579,6 +2579,7 @@ void resetClient(client *c) {
c->slot = -1;
c->flag.executing_command = 0;
c->flag.replication_done = 0;
c->flag.blocked_by_module = 0;
c->net_output_bytes_curr_cmd = 0;

/* Make sure the duration has been recorded to some command. */
Expand Down
7 changes: 4 additions & 3 deletions src/notify.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ sds keyspaceEventsFlagsToString(int flags) {

/* The API provided to the rest of the serer core is a simple function:
*
* notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
* notifyKeyspaceEvent(client *c, int type, char *event, robj *key, int dbid);
*
* 'client' is the client that triggered the keyspace event, or NULL.
* 'type' is the notification class we define in `server.h`.
* 'event' is a C string representing the event name.
* 'key' is an Object representing the key name.
* 'dbid' is the database ID where the key lives. */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
void notifyKeyspaceEvent(client *c, int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
Expand All @@ -112,7 +113,7 @@ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
* This bypasses the notifications configuration, but the module engine
* will only call event subscribers if the event type matches the types
* they are interested in. */
moduleNotifyKeyspaceEvent(type, event, key, dbid);
moduleNotifyKeyspaceEvent(c, type, event, key, dbid);

/* If notifications for this class of events are off, return ASAP. */
if (!(server.notify_keyspace_events & type)) return;
Expand Down
Loading

0 comments on commit 79f435d

Please sign in to comment.