Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for sharing memory between the module and the engine #1804

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 107 additions & 16 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ struct AutoMemEntry {
#define VALKEYMODULE_AM_FREED 3 /* Explicitly freed by user already. */
#define VALKEYMODULE_AM_DICT 4
#define VALKEYMODULE_AM_INFO 5
#define VALKEYMODULE_AM_SHARED_SDS 6

/* The pool allocator block. Modules can allocate memory via this special
* allocator that will automatically release it all once the callback returns.
Expand Down Expand Up @@ -472,6 +473,7 @@ typedef int (*ValkeyModuleConfigSetBoolFunc)(const char *name, int val, void *pr
typedef int (*ValkeyModuleConfigSetEnumFunc)(const char *name, int val, void *privdata, ValkeyModuleString **err);
/* Apply signature, identical to valkeymodule.h */
typedef int (*ValkeyModuleConfigApplyFunc)(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err);
typedef void *(*ValkeyModuleSharedSDSAllocFunc)(size_t len, size_t *alloc);

/* Struct representing a module config. These are stored in a list in the module struct */
struct ModuleConfig {
Expand Down Expand Up @@ -515,6 +517,7 @@ static void zsetKeyReset(ValkeyModuleKey *key);
static void moduleInitKeyTypeSpecific(ValkeyModuleKey *key);
void VM_FreeDict(ValkeyModuleCtx *ctx, ValkeyModuleDict *d);
void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data);
void VM_ReleaseSharedSDS(ValkeyModuleCtx *ctx, ValkeyModuleSharedSDS *shared_sds);

/* Helpers for VM_SetCommandInfo. */
static int moduleValidateCommandInfo(const ValkeyModuleCommandInfo *info);
Expand Down Expand Up @@ -2682,6 +2685,7 @@ void autoMemoryCollect(ValkeyModuleCtx *ctx) {
case VALKEYMODULE_AM_KEY: VM_CloseKey(ptr); break;
case VALKEYMODULE_AM_DICT: VM_FreeDict(NULL, ptr); break;
case VALKEYMODULE_AM_INFO: VM_FreeServerInfo(NULL, ptr); break;
case VALKEYMODULE_AM_SHARED_SDS: VM_ReleaseSharedSDS(NULL, ptr); break;
}
}
ctx->flags |= VALKEYMODULE_CTX_AUTO_MEMORY;
Expand Down Expand Up @@ -5259,6 +5263,7 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) {
* are created.
* VALKEYMODULE_HASH_CFIELDS: The field names passed are null terminated C
* strings instead of ValkeyModuleString objects.
* VALKEYMODULE_HASH_SHAREBLE_VALUES: The passed values are ValkeyModuleSharedSDS objects.
* VALKEYMODULE_HASH_COUNT_ALL: Include the number of inserted fields in the
* returned number, in addition to the number of
* updated and deleted fields. (Added in Redis OSS
Expand Down Expand Up @@ -5298,7 +5303,7 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) {
int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
va_list ap;
if (!key || (flags & ~(VALKEYMODULE_HASH_NX | VALKEYMODULE_HASH_XX | VALKEYMODULE_HASH_CFIELDS |
VALKEYMODULE_HASH_COUNT_ALL))) {
VALKEYMODULE_HASH_COUNT_ALL | VALKEYMODULE_HASH_SHAREBLE_VALUES))) {
errno = EINVAL;
return 0;
} else if (key->value && key->value->type != OBJ_HASH) {
Expand All @@ -5313,7 +5318,7 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
int count = 0;
va_start(ap, flags);
while (1) {
ValkeyModuleString *field, *value;
ValkeyModuleString *field, *value = NULL;
/* Get the field and value objects. */
if (flags & VALKEYMODULE_HASH_CFIELDS) {
char *cfield = va_arg(ap, char *);
Expand Down Expand Up @@ -5347,9 +5352,21 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
* to avoid a useless copy. */
if (flags & VALKEYMODULE_HASH_CFIELDS) low_flags |= HASH_SET_TAKE_FIELD;

robj *argv[2] = {field, value};
hashTypeTryConversion(key->value, argv, 0, 1);
int updated = hashTypeSet(key->value, field->ptr, value->ptr, low_flags);
char *value_sds;
if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) {
if (key->value->encoding == OBJ_ENCODING_LISTPACK) {
/* Convert to hashtable encoding, as list pack encoding performs a deep copy
* of the buffer, breaking ref-counting semantics. */
hashTypeConvert(key->value, OBJ_ENCODING_HASHTABLE);
}
value_sds = ((ValkeyModuleSharedSDS *)value)->buf;
} else {
value_sds = value->ptr;
robj *argv[2] = {field, value};
hashTypeTryConversion(key->value, argv, 0, 1);
}

int updated = hashTypeSet(key->value, field->ptr, value_sds, low_flags);
count += (flags & VALKEYMODULE_HASH_COUNT_ALL) ? 1 : updated;

/* If CFIELDS is active, SDS string ownership is now of hashTypeSet(),
Expand Down Expand Up @@ -5383,6 +5400,8 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
*
* VALKEYMODULE_HASH_CFIELDS: field names as null terminated C strings.
*
* VALKEYMODULE_HASH_SHAREBLE_VALUES: The passed values are ValkeyModuleSharedSDS objects.
*
* VALKEYMODULE_HASH_EXISTS: instead of setting the value of the field
* expecting a ValkeyModuleString pointer to pointer, the function just
* reports if the field exists or not and expects an integer pointer
Expand Down Expand Up @@ -5412,7 +5431,7 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) {

va_start(ap, flags);
while (1) {
ValkeyModuleString *field, **valueptr;
ValkeyModuleString *field;
int *existsptr;
/* Get the field object and the value pointer to pointer. */
if (flags & VALKEYMODULE_HASH_CFIELDS) {
Expand All @@ -5432,17 +5451,32 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) {
else
*existsptr = 0;
} else {
valueptr = va_arg(ap, ValkeyModuleString **);
if (key->value) {
*valueptr = hashTypeGetValueObject(key->value, field->ptr);
if (*valueptr) {
robj *decoded = getDecodedObject(*valueptr);
decrRefCount(*valueptr);
*valueptr = decoded;
}
if (*valueptr) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_STRING, *valueptr);
} else {
if (!key->value) {
ValkeyModuleString **valueptr = va_arg(ap, ValkeyModuleString **);
*valueptr = NULL;
} else {
if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) {
ValkeyModuleSharedSDS **valueptr = va_arg(ap, ValkeyModuleSharedSDS **);
*valueptr = NULL;
/* shared SDS is supported only with hashtable encoding */
if (key->value->encoding == OBJ_ENCODING_HASHTABLE) {
sds value_sds = hashTypeGetFromHashTable(key->value, field->ptr);
if (value_sds && sdsType(value_sds) == SDS_TYPE_32_SHARED) {
*valueptr = (ValkeyModuleSharedSDS *)sdsAllocPtr(value_sds);
sdsRetain(*valueptr);
if (key->ctx != NULL) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_SHARED_SDS, *valueptr);
}
}
} else {
ValkeyModuleString **valueptr = va_arg(ap, ValkeyModuleString **);
*valueptr = hashTypeGetValueObject(key->value, field->ptr);
if (*valueptr) {
robj *decoded = getDecodedObject(*valueptr);
decrRefCount(*valueptr);
*valueptr = decoded;
}
if (*valueptr) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_STRING, *valueptr);
}
}
}

Expand Down Expand Up @@ -13256,6 +13290,60 @@ ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState(
return ret == SCRIPT_CONTINUE ? VMSE_STATE_EXECUTING : VMSE_STATE_KILLED;
}

/* --------------------------------------------------------------------------
* ## Shared SDS APIs
* -------------------------------------------------------------------------- */

/* Create a new module shared SDS object. The newly created SDS object's intrusive
* reference count is initialized to 1.
* The caller is responsible for invoking `ValkeyModule_ReleaseSharedSDS` when the
* object is no longer needed to ensure proper cleanup.
*
* Parameters:
* - `len`: Specifies the size of the allocated SDS buffer.
* - `allocfn`: A custom memory allocation function, allowing fine-grained control
* over the allocation strategy.
* - `freecbfn`: A callback function triggered on deallocation. Note that this does
* not free the object itself but is primarily used for statistical tracking.
*
* Returns:
* - A pointer to the created shared SDS object.
*/
ValkeyModuleSharedSDS *VM_CreateSharedSDS(ValkeyModuleCtx *ctx, size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) {
size_t alloc;
void *buf = allocfn(len + offsetof(sdshdr32shared, buf) + 1, &alloc);
ValkeyModuleSharedSDS *shared_sds = sdsInitShared((char *)buf, len, alloc, freecbfn);
if (ctx != NULL) autoMemoryAdd(ctx, VALKEYMODULE_AM_SHARED_SDS, shared_sds);
return shared_sds;
}

/* Retrieves the pointer to the shared SDS buffer along with its length.
*
* Parameters:
* - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object.
* - `len`: Output parameter that stores the length of the SDS buffer.
*
* Returns:
* - A pointer to the SDS buffer string.
*/
char *VM_SharedSDSPtrLen(ValkeyModuleSharedSDS *shared_sds, size_t *len) {
*len = shared_sds->len;
return (char *)shared_sds + offsetof(sdshdr32shared, buf);
}

/* Releases a shared SDS object by decrementing its intrusive reference count.
*
* Every shared SDS object created by `VM_CreateSharedSDS` must be released
* using `VM_ReleaseSharedSDS` to ensure proper memory management.
*
* Parameters:
* - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object to be released.
*/
void VM_ReleaseSharedSDS(ValkeyModuleCtx *ctx, ValkeyModuleSharedSDS *shared_sds) {
int res = sdsReleaseShared(shared_sds);
if (ctx != NULL && res) autoMemoryFreed(ctx, VALKEYMODULE_AM_SHARED_SDS, shared_sds);
}

/* MODULE command.
*
* MODULE LIST
Expand Down Expand Up @@ -14130,4 +14218,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RegisterScriptingEngine);
REGISTER_API(UnregisterScriptingEngine);
REGISTER_API(GetFunctionExecutionState);
REGISTER_API(CreateSharedSDS);
REGISTER_API(SharedSDSPtrLen);
REGISTER_API(ReleaseSharedSDS);
}
77 changes: 74 additions & 3 deletions src/sds.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ int sdsHdrSize(char type) {
case SDS_TYPE_16: return sizeof(struct sdshdr16);
case SDS_TYPE_32: return sizeof(struct sdshdr32);
case SDS_TYPE_64: return sizeof(struct sdshdr64);
case SDS_TYPE_32_SHARED: return sizeof(struct sdshdr32shared);
}
return 0;
}
Expand All @@ -71,14 +72,15 @@ static inline size_t sdsTypeMaxSize(char type) {
if (type == SDS_TYPE_8) return (1 << 8) - 1;
if (type == SDS_TYPE_16) return (1 << 16) - 1;
#if (LONG_MAX == LLONG_MAX)
if (type == SDS_TYPE_32) return (1ll << 32) - 1;
if (type == SDS_TYPE_32 || type == SDS_TYPE_32_SHARED) return (1ll << 32) - 1;
#endif
return -1; /* this is equivalent to the max SDS_TYPE_64 or SDS_TYPE_32 */
}

static inline int adjustTypeIfNeeded(char *type, int *hdrlen, size_t bufsize) {
size_t usable = bufsize - *hdrlen - 1;
if (*type != SDS_TYPE_5 && usable > sdsTypeMaxSize(*type)) {
if (*type != SDS_TYPE_5 && *type != SDS_TYPE_32_SHARED &&
usable > sdsTypeMaxSize(*type)) {
*type = sdsReqType(usable);
*hdrlen = sdsHdrSize(*type);
return 1;
Expand Down Expand Up @@ -124,6 +126,7 @@ sds _sdsnewlen(const void *init, size_t initlen, int trymalloc) {
* can't be greater than `sdsTypeMaxSize(type)`. */
sds sdswrite(char *buf, size_t bufsize, char type, const char *init, size_t initlen) {
assert(bufsize >= sdsReqSize(initlen, type));
assert(type != SDS_TYPE_32_SHARED);
int hdrlen = sdsHdrSize(type);
size_t usable = bufsize - hdrlen - 1;
sds s = buf + hdrlen;
Expand Down Expand Up @@ -204,8 +207,24 @@ sds sdsdup(const_sds s) {
/* Free an sds string. No operation is performed if 's' is NULL. */
void sdsfree(sds s) {
if (s == NULL) return;
if (sdsType(s) == SDS_TYPE_32_SHARED) {
SDS_HDR_VAR(32shared, s);
sdsReleaseShared(sh);
return;
}
s_free_with_size(sdsAllocPtr(s), sdsAllocSize(s));
}
/* return 1 if the shared sds is freed otherwise 0 */
int sdsReleaseShared(sdshdrshared *sh) {
if (atomic_fetch_sub_explicit(&sh->refcount, 1, memory_order_acq_rel) > 1) {
return 0;
}
char *ptr = sh->buf + sh->len + 1 - sdsAllocSize(sh->buf);
size_t alloc_size = sdsAllocSize(sh->buf);
sh->freecbfn(ptr, alloc_size);
s_free_with_size(ptr, alloc_size);
return 1;
}

/* This variant of sdsfree() gets its argument as void, and is useful
* as free method in data structures that expect a 'void free_object(void*)'
Expand Down Expand Up @@ -266,6 +285,7 @@ sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) {
/* Return ASAP if there is enough space left. */
if (avail >= addlen) return s;

assert(oldtype != SDS_TYPE_32_SHARED);
len = sdslen(s);
sh = (char *)s - sdsHdrSize(oldtype);
reqlen = newlen = (len + addlen);
Expand Down Expand Up @@ -351,6 +371,7 @@ sds sdsResize(sds s, size_t size, int would_regrow) {

/* Return ASAP if the size is already good. */
if (sdsalloc(s) == size) return s;
assert(oldtype != SDS_TYPE_32_SHARED);

/* Truncate len if needed. */
if (size < len) len = size;
Expand Down Expand Up @@ -436,9 +457,51 @@ size_t sdsAllocSize(const_sds s) {
/* Return the pointer of the actual SDS allocation (normally SDS strings
* are referenced by the start of the string buffer). */
void *sdsAllocPtr(const_sds s) {
return (void *)(s - sdsHdrSize(sdsType(s)));
char type = sdsType(s);
if (type == SDS_TYPE_32_SHARED) {
return (void *)(s - offsetof(sdshdrshared, buf));
}
return (void *)(s - sdsHdrSize(type));
}

/* Initialize a shared sds into a buffer `buf`.
*
* Parameters:
* - `buf`: A pointer to the allocated buffer that is initialized a the shared sds.
* - `len`: The length of the sds buffer.
* - `alloc`: The total allocated size of the buffer, including metadata.
* - `freecbfn`: A callback function that is invoked when the sds object is released.
*
* Returns:
* - A pointer to the initialized `sdshdrshared` structure.
*
* Notes:
* - The caller is responsible for ensuring that `buf` is large enough to accommodate
* the SDS metadata, the string content, and the null terminator.
* - The `freecbfn` does not directly free memory but is used primarily for
* tracking deallocation events.
*/
sdshdrshared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn) {
buf[alloc - 1] = 0;
sds s = buf + alloc - len - 1;
SDS_HDR_VAR(32shared, s);
sh->freecbfn = freecbfn;
atomic_init(&sh->refcount, 1);
sh->len = (uint32_t)len;
sh->alloc = alloc - sdsHdrSize(SDS_TYPE_32_SHARED) - 1;
sh->flags = SDS_TYPE_32_SHARED;
return sh;
}

/* Increases the reference count of a shared SDS object.
*
* Parameters:
* - `sh`: A pointer to the `sdshdrshared` structure whose reference count
* should be increased.
*/
void sdsRetain(sdshdrshared *sh) {
atomic_fetch_add_explicit(&sh->refcount, 1, memory_order_relaxed);
}
/* Increment the sds length and decrements the left free space at the
* end of the string according to 'incr'. Also set the null term
* in the new end of the string.
Expand Down Expand Up @@ -498,6 +561,13 @@ void sdsIncrLen(sds s, ssize_t incr) {
len = (sh->len += incr);
break;
}
case SDS_TYPE_32_SHARED: {
SDS_HDR_VAR(32shared, s);
assert((incr >= 0 && sh->alloc - sh->len >= (unsigned int)incr) ||
(incr < 0 && sh->len >= (unsigned int)(-incr)));
len = (sh->len += incr);
break;
}
default: len = 0; /* Just to avoid compilation warnings. */
}
s[len] = '\0';
Expand Down Expand Up @@ -779,6 +849,7 @@ sds sdstrim(sds s, const char *cset) {
char *end, *sp, *ep;
size_t len;

assert(sdsType(s) != SDS_TYPE_32_SHARED);
sp = s;
ep = end = s + sdslen(s) - 1;
while (sp <= end && strchr(cset, *sp)) sp++;
Expand Down
Loading
Loading