Skip to content

Commit

Permalink
Adding support for sharing memory between the module and the engine
Browse files Browse the repository at this point in the history
Sharing memory between the module and engine reduces memory overhead by eliminating
redundant copies of stored entries in the module. This is particularly beneficial
for search workloads that require indexing large volumes of stored data.

Shared SDS, a new data type, facilitates module-engine memory sharing with thread-safe
intrusive reference counting. It preserves SDS semantics and structure while adding
ref-counting and a free callback for statistics tracking.

New module APIs:

- VM_CreateSharedSDS: Creates a new Shared SDS.
- VM_SharedSDSPtrLen: Retrieves the raw buffer pointer and length of a Shared SDS.
- VM_ReleaseSharedSDS: Decreases the Shared SDS ref-count by 1.

Extended module APIs:

- VM_HashSet: Now supports setting a shared SDS in the hash.
- VM_HashGet: Retrieves a shared SDS and increments its ref-count by 1.
  • Loading branch information
yairgott committed Mar 1, 2025
1 parent 3f6581b commit 104a4dd
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 24 deletions.
120 changes: 104 additions & 16 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ struct AutoMemEntry {
#define VALKEYMODULE_AM_FREED 3 /* Explicitly freed by user already. */
#define VALKEYMODULE_AM_DICT 4
#define VALKEYMODULE_AM_INFO 5
#define VALKEYMODULE_AM_SHARED_SDS 6

/* The pool allocator block. Modules can allocate memory via this special
* allocator that will automatically release it all once the callback returns.
Expand Down Expand Up @@ -472,6 +473,7 @@ typedef int (*ValkeyModuleConfigSetBoolFunc)(const char *name, int val, void *pr
typedef int (*ValkeyModuleConfigSetEnumFunc)(const char *name, int val, void *privdata, ValkeyModuleString **err);
/* Apply signature, identical to valkeymodule.h */
typedef int (*ValkeyModuleConfigApplyFunc)(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err);
typedef void *(*ValkeyModuleSharedSDSAllocFunc)(size_t len, size_t *alloc);

/* Struct representing a module config. These are stored in a list in the module struct */
struct ModuleConfig {
Expand Down Expand Up @@ -515,6 +517,7 @@ static void zsetKeyReset(ValkeyModuleKey *key);
static void moduleInitKeyTypeSpecific(ValkeyModuleKey *key);
void VM_FreeDict(ValkeyModuleCtx *ctx, ValkeyModuleDict *d);
void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data);
void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds);

/* Helpers for VM_SetCommandInfo. */
static int moduleValidateCommandInfo(const ValkeyModuleCommandInfo *info);
Expand Down Expand Up @@ -2682,6 +2685,7 @@ void autoMemoryCollect(ValkeyModuleCtx *ctx) {
case VALKEYMODULE_AM_KEY: VM_CloseKey(ptr); break;
case VALKEYMODULE_AM_DICT: VM_FreeDict(NULL, ptr); break;
case VALKEYMODULE_AM_INFO: VM_FreeServerInfo(NULL, ptr); break;
case VALKEYMODULE_AM_SHARED_SDS: VM_ReleaseSharedSDS(ptr); break;
}
}
ctx->flags |= VALKEYMODULE_CTX_AUTO_MEMORY;
Expand Down Expand Up @@ -5259,6 +5263,7 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) {
* are created.
* VALKEYMODULE_HASH_CFIELDS: The field names passed are null terminated C
* strings instead of ValkeyModuleString objects.
* VALKEYMODULE_HASH_SHAREBLE_VALUES: The passed values are ValkeyModuleSharedSDS objects.
* VALKEYMODULE_HASH_COUNT_ALL: Include the number of inserted fields in the
* returned number, in addition to the number of
* updated and deleted fields. (Added in Redis OSS
Expand Down Expand Up @@ -5298,7 +5303,7 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) {
int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
va_list ap;
if (!key || (flags & ~(VALKEYMODULE_HASH_NX | VALKEYMODULE_HASH_XX | VALKEYMODULE_HASH_CFIELDS |
VALKEYMODULE_HASH_COUNT_ALL))) {
VALKEYMODULE_HASH_COUNT_ALL | VALKEYMODULE_HASH_SHAREBLE_VALUES))) {
errno = EINVAL;
return 0;
} else if (key->value && key->value->type != OBJ_HASH) {
Expand All @@ -5313,7 +5318,7 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
int count = 0;
va_start(ap, flags);
while (1) {
ValkeyModuleString *field, *value;
ValkeyModuleString *field, *value = NULL;
/* Get the field and value objects. */
if (flags & VALKEYMODULE_HASH_CFIELDS) {
char *cfield = va_arg(ap, char *);
Expand Down Expand Up @@ -5347,9 +5352,21 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
* to avoid a useless copy. */
if (flags & VALKEYMODULE_HASH_CFIELDS) low_flags |= HASH_SET_TAKE_FIELD;

robj *argv[2] = {field, value};
hashTypeTryConversion(key->value, argv, 0, 1);
int updated = hashTypeSet(key->value, field->ptr, value->ptr, low_flags);
char *value_sds;
if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) {
if (key->value->encoding == OBJ_ENCODING_LISTPACK) {
/* Convert to hashtable encoding, as list pack encoding performs a deep copy
* of the buffer, breaking ref-counting semantics. */
hashTypeConvert(key->value, OBJ_ENCODING_HASHTABLE);
}
value_sds = ((ValkeyModuleSharedSDS *)value)->buf;
} else {
value_sds = value->ptr;
robj *argv[2] = {field, value};
hashTypeTryConversion(key->value, argv, 0, 1);
}

int updated = hashTypeSet(key->value, field->ptr, value_sds, low_flags);
count += (flags & VALKEYMODULE_HASH_COUNT_ALL) ? 1 : updated;

/* If CFIELDS is active, SDS string ownership is now of hashTypeSet(),
Expand Down Expand Up @@ -5383,6 +5400,8 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
*
* VALKEYMODULE_HASH_CFIELDS: field names as null terminated C strings.
*
* VALKEYMODULE_HASH_SHAREBLE_VALUES: The passed values are ValkeyModuleSharedSDS objects.
*
* VALKEYMODULE_HASH_EXISTS: instead of setting the value of the field
* expecting a ValkeyModuleString pointer to pointer, the function just
* reports if the field exists or not and expects an integer pointer
Expand Down Expand Up @@ -5412,7 +5431,7 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) {

va_start(ap, flags);
while (1) {
ValkeyModuleString *field, **valueptr;
ValkeyModuleString *field;
int *existsptr;
/* Get the field object and the value pointer to pointer. */
if (flags & VALKEYMODULE_HASH_CFIELDS) {
Expand All @@ -5432,17 +5451,32 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) {
else
*existsptr = 0;
} else {
valueptr = va_arg(ap, ValkeyModuleString **);
if (key->value) {
*valueptr = hashTypeGetValueObject(key->value, field->ptr);
if (*valueptr) {
robj *decoded = getDecodedObject(*valueptr);
decrRefCount(*valueptr);
*valueptr = decoded;
}
if (*valueptr) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_STRING, *valueptr);
} else {
if (!key->value) {
ValkeyModuleString **valueptr = va_arg(ap, ValkeyModuleString **);
*valueptr = NULL;
} else {
if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) {
ValkeyModuleSharedSDS **valueptr = va_arg(ap, ValkeyModuleSharedSDS **);
*valueptr = NULL;
/* shared SDS is supported only with hashtable encoding */
if (key->value->encoding == OBJ_ENCODING_HASHTABLE) {
sds value_sds = hashTypeGetFromHashTable(key->value, field->ptr);
if (value_sds && sdsType(value_sds) == SDS_TYPE_32_SHARED) {
*valueptr = (ValkeyModuleSharedSDS *)(value_sds - sdsHdrSize(sdsType(value_sds)));
sdsRetain(*valueptr);
autoMemoryAdd(key->ctx, VALKEYMODULE_AM_SHARED_SDS, *valueptr);
}
}
} else {
ValkeyModuleString **valueptr = va_arg(ap, ValkeyModuleString **);
*valueptr = hashTypeGetValueObject(key->value, field->ptr);
if (*valueptr) {
robj *decoded = getDecodedObject(*valueptr);
decrRefCount(*valueptr);
*valueptr = decoded;
}
if (*valueptr) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_STRING, *valueptr);
}
}
}

Expand Down Expand Up @@ -13256,6 +13290,57 @@ ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState(
return ret == SCRIPT_CONTINUE ? VMSE_STATE_EXECUTING : VMSE_STATE_KILLED;
}

/* --------------------------------------------------------------------------
* ## Shared SDS APIs
* -------------------------------------------------------------------------- */

/* Create a new module shared SDS object. The newly created SDS object's intrusive
* reference count is initialized to 1.
* The caller is responsible for invoking `ValkeyModule_ReleaseSharedSDS` when the
* object is no longer needed to ensure proper cleanup.
*
* Parameters:
* - `len`: Specifies the size of the allocated SDS buffer.
* - `allocfn`: A custom memory allocation function, allowing fine-grained control
* over the allocation strategy.
* - `freecbfn`: A callback function triggered on deallocation. Note that this does
* not free the object itself but is primarily used for statistical tracking.
*
* Returns:
* - A pointer to the created shared SDS object.
*/
ValkeyModuleSharedSDS *VM_CreateSharedSDS(size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) {
size_t alloc;
void *buf = allocfn(len + sizeof(ValkeyModuleSharedSDS) + 1, &alloc);
return sdsInitShared((char *)buf, len, alloc, freecbfn);
}

/* Retrieves the pointer to the shared SDS buffer along with its length.
*
* Parameters:
* - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object.
* - `len`: Output parameter that stores the length of the SDS buffer.
*
* Returns:
* - A pointer to the SDS buffer string.
*/
char *VM_SharedSDSPtrLen(ValkeyModuleSharedSDS *shared_sds, size_t *len) {
*len = shared_sds->len;
return (char *)shared_sds + sizeof(ValkeyModuleSharedSDS);
}

/* Releases a shared SDS object by decrementing its intrusive reference count.
*
* Every shared SDS object created by `VM_CreateSharedSDS` must be released
* using `VM_ReleaseSharedSDS` to ensure proper memory management.
*
* Parameters:
* - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object to be released.
*/
void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds) {
sdsfree(shared_sds->buf);
}

/* MODULE command.
*
* MODULE LIST
Expand Down Expand Up @@ -14130,4 +14215,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RegisterScriptingEngine);
REGISTER_API(UnregisterScriptingEngine);
REGISTER_API(GetFunctionExecutionState);
REGISTER_API(CreateSharedSDS);
REGISTER_API(SharedSDSPtrLen);
REGISTER_API(ReleaseSharedSDS);
}
64 changes: 62 additions & 2 deletions src/sds.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ int sdsHdrSize(char type) {
case SDS_TYPE_16: return sizeof(struct sdshdr16);
case SDS_TYPE_32: return sizeof(struct sdshdr32);
case SDS_TYPE_64: return sizeof(struct sdshdr64);
case SDS_TYPE_32_SHARED: return sizeof(struct sdshdr32shared);
}
return 0;
}
Expand All @@ -71,14 +72,15 @@ static inline size_t sdsTypeMaxSize(char type) {
if (type == SDS_TYPE_8) return (1 << 8) - 1;
if (type == SDS_TYPE_16) return (1 << 16) - 1;
#if (LONG_MAX == LLONG_MAX)
if (type == SDS_TYPE_32) return (1ll << 32) - 1;
if (type == SDS_TYPE_32 || type == SDS_TYPE_32_SHARED) return (1ll << 32) - 1;
#endif
return -1; /* this is equivalent to the max SDS_TYPE_64 or SDS_TYPE_32 */
}

static inline int adjustTypeIfNeeded(char *type, int *hdrlen, size_t bufsize) {
size_t usable = bufsize - *hdrlen - 1;
if (*type != SDS_TYPE_5 && usable > sdsTypeMaxSize(*type)) {
if (*type != SDS_TYPE_5 && *type != SDS_TYPE_32_SHARED &&
usable > sdsTypeMaxSize(*type)) {
*type = sdsReqType(usable);
*hdrlen = sdsHdrSize(*type);
return 1;
Expand Down Expand Up @@ -124,6 +126,7 @@ sds _sdsnewlen(const void *init, size_t initlen, int trymalloc) {
* can't be greater than `sdsTypeMaxSize(type)`. */
sds sdswrite(char *buf, size_t bufsize, char type, const char *init, size_t initlen) {
assert(bufsize >= sdsReqSize(initlen, type));
assert(type != SDS_TYPE_32_SHARED);
int hdrlen = sdsHdrSize(type);
size_t usable = bufsize - hdrlen - 1;
sds s = buf + hdrlen;
Expand Down Expand Up @@ -204,6 +207,15 @@ sds sdsdup(const_sds s) {
/* Free an sds string. No operation is performed if 's' is NULL. */
void sdsfree(sds s) {
if (s == NULL) return;
if (sdsType(s) == SDS_TYPE_32_SHARED) {
SDS_HDR_VAR(32shared, s);
if (atomic_fetch_sub_explicit(&sh->refcount, 1, memory_order_acq_rel) > 1) {
return;
}
sh->freecbfn(sh, sdsAllocSize(s));
s_free_with_size(s + sh->len + 1 - sdsAllocSize(s), sdsAllocSize(s));
return;
}
s_free_with_size(sdsAllocPtr(s), sdsAllocSize(s));
}

Expand Down Expand Up @@ -266,6 +278,7 @@ sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) {
/* Return ASAP if there is enough space left. */
if (avail >= addlen) return s;

assert(oldtype != SDS_TYPE_32_SHARED);
len = sdslen(s);
sh = (char *)s - sdsHdrSize(oldtype);
reqlen = newlen = (len + addlen);
Expand Down Expand Up @@ -351,6 +364,7 @@ sds sdsResize(sds s, size_t size, int would_regrow) {

/* Return ASAP if the size is already good. */
if (sdsalloc(s) == size) return s;
assert(oldtype != SDS_TYPE_32_SHARED);

/* Truncate len if needed. */
if (size < len) len = size;
Expand Down Expand Up @@ -439,6 +453,44 @@ void *sdsAllocPtr(const_sds s) {
return (void *)(s - sdsHdrSize(sdsType(s)));
}

/* Initialize a shared sds into a buffer `buf`.
*
* Parameters:
* - `buf`: A pointer to the allocated buffer that is initialized a the shared sds.
* - `len`: The length of the sds buffer.
* - `alloc`: The total allocated size of the buffer, including metadata.
* - `freecbfn`: A callback function that is invoked when the sds object is released.
*
* Returns:
* - A pointer to the initialized `sdshdr32shared` structure.
*
* Notes:
* - The caller is responsible for ensuring that `buf` is large enough to accommodate
* the SDS metadata, the string content, and the null terminator.
* - The `freecbfn` does not directly free memory but is used primarily for
* tracking deallocation events.
*/
sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn) {
buf[alloc - 1] = 0;
sds s = buf + alloc - len - 1;
SDS_HDR_VAR(32shared, s);
sh->freecbfn = freecbfn;
atomic_init(&sh->refcount, 1);
sh->len = len;
sh->alloc = alloc - sdsHdrSize(SDS_TYPE_32_SHARED) - 1;
sh->flags = SDS_TYPE_32_SHARED;
return sh;
}

/* Increases the reference count of a shared SDS object.
*
* Parameters:
* - `sh`: A pointer to the `sdshdr32shared` structure whose reference count
* should be increased.
*/
void sdsRetain(sdshdr32shared *sh) {
atomic_fetch_add_explicit(&sh->refcount, 1, memory_order_relaxed);
}
/* Increment the sds length and decrements the left free space at the
* end of the string according to 'incr'. Also set the null term
* in the new end of the string.
Expand Down Expand Up @@ -498,6 +550,13 @@ void sdsIncrLen(sds s, ssize_t incr) {
len = (sh->len += incr);
break;
}
case SDS_TYPE_32_SHARED: {
SDS_HDR_VAR(32shared, s);
assert((incr >= 0 && sh->alloc - sh->len >= (unsigned int)incr) ||
(incr < 0 && sh->len >= (unsigned int)(-incr)));
len = (sh->len += incr);
break;
}
default: len = 0; /* Just to avoid compilation warnings. */
}
s[len] = '\0';
Expand Down Expand Up @@ -779,6 +838,7 @@ sds sdstrim(sds s, const char *cset) {
char *end, *sp, *ep;
size_t len;

assert(sdsType(s) != SDS_TYPE_32_SHARED);
sp = s;
ep = end = s + sdslen(s) - 1;
while (sp <= end && strchr(cset, *sp)) sp++;
Expand Down
Loading

0 comments on commit 104a4dd

Please sign in to comment.