From f9aad1a630b702fda02253aace35ed805c7325c1 Mon Sep 17 00:00:00 2001 From: yairgott Date: Sat, 1 Mar 2025 00:09:11 -0800 Subject: [PATCH 1/2] Adding support for sharing memory between the module and the engine Sharing memory between the module and engine reduces memory overhead by eliminating redundant copies of stored entries in the module. This is particularly beneficial for search workloads that require indexing large volumes of stored data. Shared SDS, a new data type, facilitates module-engine memory sharing with thread-safe intrusive reference counting. It preserves SDS semantics and structure while adding ref-counting and a free callback for statistics tracking. New module APIs: - VM_CreateSharedSDS: Creates a new Shared SDS. - VM_SharedSDSPtrLen: Retrieves the raw buffer pointer and length of a Shared SDS. - VM_ReleaseSharedSDS: Decreases the Shared SDS ref-count by 1. Extended module APIs: - VM_HashSet: Now supports setting a shared SDS in the hash. - VM_HashGet: Retrieves a shared SDS and increments its ref-count by 1. --- src/module.c | 120 +++++++++++++++++++++++++++++++++++++++------ src/sds.c | 64 +++++++++++++++++++++++- src/sds.h | 23 +++++++++ src/server.h | 1 + src/t_hash.c | 22 ++++++--- src/valkeymodule.h | 15 ++++++ 6 files changed, 221 insertions(+), 24 deletions(-) diff --git a/src/module.c b/src/module.c index 2da40e8dda..a0812eb178 100644 --- a/src/module.c +++ b/src/module.c @@ -122,6 +122,7 @@ struct AutoMemEntry { #define VALKEYMODULE_AM_FREED 3 /* Explicitly freed by user already. */ #define VALKEYMODULE_AM_DICT 4 #define VALKEYMODULE_AM_INFO 5 +#define VALKEYMODULE_AM_SHARED_SDS 6 /* The pool allocator block. Modules can allocate memory via this special * allocator that will automatically release it all once the callback returns. @@ -472,6 +473,7 @@ typedef int (*ValkeyModuleConfigSetBoolFunc)(const char *name, int val, void *pr typedef int (*ValkeyModuleConfigSetEnumFunc)(const char *name, int val, void *privdata, ValkeyModuleString **err); /* Apply signature, identical to valkeymodule.h */ typedef int (*ValkeyModuleConfigApplyFunc)(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err); +typedef void *(*ValkeyModuleSharedSDSAllocFunc)(size_t len, size_t *alloc); /* Struct representing a module config. These are stored in a list in the module struct */ struct ModuleConfig { @@ -515,6 +517,7 @@ static void zsetKeyReset(ValkeyModuleKey *key); static void moduleInitKeyTypeSpecific(ValkeyModuleKey *key); void VM_FreeDict(ValkeyModuleCtx *ctx, ValkeyModuleDict *d); void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data); +void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds); /* Helpers for VM_SetCommandInfo. */ static int moduleValidateCommandInfo(const ValkeyModuleCommandInfo *info); @@ -2682,6 +2685,7 @@ void autoMemoryCollect(ValkeyModuleCtx *ctx) { case VALKEYMODULE_AM_KEY: VM_CloseKey(ptr); break; case VALKEYMODULE_AM_DICT: VM_FreeDict(NULL, ptr); break; case VALKEYMODULE_AM_INFO: VM_FreeServerInfo(NULL, ptr); break; + case VALKEYMODULE_AM_SHARED_SDS: VM_ReleaseSharedSDS(ptr); break; } } ctx->flags |= VALKEYMODULE_CTX_AUTO_MEMORY; @@ -5259,6 +5263,7 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) { * are created. * VALKEYMODULE_HASH_CFIELDS: The field names passed are null terminated C * strings instead of ValkeyModuleString objects. + * VALKEYMODULE_HASH_SHAREBLE_VALUES: The passed values are ValkeyModuleSharedSDS objects. * VALKEYMODULE_HASH_COUNT_ALL: Include the number of inserted fields in the * returned number, in addition to the number of * updated and deleted fields. (Added in Redis OSS @@ -5298,7 +5303,7 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) { int VM_HashSet(ValkeyModuleKey *key, int flags, ...) { va_list ap; if (!key || (flags & ~(VALKEYMODULE_HASH_NX | VALKEYMODULE_HASH_XX | VALKEYMODULE_HASH_CFIELDS | - VALKEYMODULE_HASH_COUNT_ALL))) { + VALKEYMODULE_HASH_COUNT_ALL | VALKEYMODULE_HASH_SHAREBLE_VALUES))) { errno = EINVAL; return 0; } else if (key->value && key->value->type != OBJ_HASH) { @@ -5313,7 +5318,7 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) { int count = 0; va_start(ap, flags); while (1) { - ValkeyModuleString *field, *value; + ValkeyModuleString *field, *value = NULL; /* Get the field and value objects. */ if (flags & VALKEYMODULE_HASH_CFIELDS) { char *cfield = va_arg(ap, char *); @@ -5347,9 +5352,21 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) { * to avoid a useless copy. */ if (flags & VALKEYMODULE_HASH_CFIELDS) low_flags |= HASH_SET_TAKE_FIELD; - robj *argv[2] = {field, value}; - hashTypeTryConversion(key->value, argv, 0, 1); - int updated = hashTypeSet(key->value, field->ptr, value->ptr, low_flags); + char *value_sds; + if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) { + if (key->value->encoding == OBJ_ENCODING_LISTPACK) { + /* Convert to hashtable encoding, as list pack encoding performs a deep copy + * of the buffer, breaking ref-counting semantics. */ + hashTypeConvert(key->value, OBJ_ENCODING_HASHTABLE); + } + value_sds = ((ValkeyModuleSharedSDS *)value)->buf; + } else { + value_sds = value->ptr; + robj *argv[2] = {field, value}; + hashTypeTryConversion(key->value, argv, 0, 1); + } + + int updated = hashTypeSet(key->value, field->ptr, value_sds, low_flags); count += (flags & VALKEYMODULE_HASH_COUNT_ALL) ? 1 : updated; /* If CFIELDS is active, SDS string ownership is now of hashTypeSet(), @@ -5383,6 +5400,8 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) { * * VALKEYMODULE_HASH_CFIELDS: field names as null terminated C strings. * + * VALKEYMODULE_HASH_SHAREBLE_VALUES: The passed values are ValkeyModuleSharedSDS objects. + * * VALKEYMODULE_HASH_EXISTS: instead of setting the value of the field * expecting a ValkeyModuleString pointer to pointer, the function just * reports if the field exists or not and expects an integer pointer @@ -5412,7 +5431,7 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) { va_start(ap, flags); while (1) { - ValkeyModuleString *field, **valueptr; + ValkeyModuleString *field; int *existsptr; /* Get the field object and the value pointer to pointer. */ if (flags & VALKEYMODULE_HASH_CFIELDS) { @@ -5432,17 +5451,32 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) { else *existsptr = 0; } else { - valueptr = va_arg(ap, ValkeyModuleString **); - if (key->value) { - *valueptr = hashTypeGetValueObject(key->value, field->ptr); - if (*valueptr) { - robj *decoded = getDecodedObject(*valueptr); - decrRefCount(*valueptr); - *valueptr = decoded; - } - if (*valueptr) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_STRING, *valueptr); - } else { + if (!key->value) { + ValkeyModuleString **valueptr = va_arg(ap, ValkeyModuleString **); *valueptr = NULL; + } else { + if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) { + ValkeyModuleSharedSDS **valueptr = va_arg(ap, ValkeyModuleSharedSDS **); + *valueptr = NULL; + /* shared SDS is supported only with hashtable encoding */ + if (key->value->encoding == OBJ_ENCODING_HASHTABLE) { + sds value_sds = hashTypeGetFromHashTable(key->value, field->ptr); + if (value_sds && sdsType(value_sds) == SDS_TYPE_32_SHARED) { + *valueptr = (ValkeyModuleSharedSDS *)(value_sds - sdsHdrSize(sdsType(value_sds))); + sdsRetain(*valueptr); + autoMemoryAdd(key->ctx, VALKEYMODULE_AM_SHARED_SDS, *valueptr); + } + } + } else { + ValkeyModuleString **valueptr = va_arg(ap, ValkeyModuleString **); + *valueptr = hashTypeGetValueObject(key->value, field->ptr); + if (*valueptr) { + robj *decoded = getDecodedObject(*valueptr); + decrRefCount(*valueptr); + *valueptr = decoded; + } + if (*valueptr) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_STRING, *valueptr); + } } } @@ -13256,6 +13290,57 @@ ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState( return ret == SCRIPT_CONTINUE ? VMSE_STATE_EXECUTING : VMSE_STATE_KILLED; } +/* -------------------------------------------------------------------------- + * ## Shared SDS APIs + * -------------------------------------------------------------------------- */ + +/* Create a new module shared SDS object. The newly created SDS object's intrusive + * reference count is initialized to 1. + * The caller is responsible for invoking `ValkeyModule_ReleaseSharedSDS` when the + * object is no longer needed to ensure proper cleanup. + * + * Parameters: + * - `len`: Specifies the size of the allocated SDS buffer. + * - `allocfn`: A custom memory allocation function, allowing fine-grained control + * over the allocation strategy. + * - `freecbfn`: A callback function triggered on deallocation. Note that this does + * not free the object itself but is primarily used for statistical tracking. + * + * Returns: + * - A pointer to the created shared SDS object. + */ +ValkeyModuleSharedSDS *VM_CreateSharedSDS(size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) { + size_t alloc; + void *buf = allocfn(len + sizeof(ValkeyModuleSharedSDS) + 1, &alloc); + return sdsInitShared((char *)buf, len, alloc, freecbfn); +} + +/* Retrieves the pointer to the shared SDS buffer along with its length. + * + * Parameters: + * - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object. + * - `len`: Output parameter that stores the length of the SDS buffer. + * + * Returns: + * - A pointer to the SDS buffer string. + */ +char *VM_SharedSDSPtrLen(ValkeyModuleSharedSDS *shared_sds, size_t *len) { + *len = shared_sds->len; + return (char *)shared_sds + sizeof(ValkeyModuleSharedSDS); +} + +/* Releases a shared SDS object by decrementing its intrusive reference count. + * + * Every shared SDS object created by `VM_CreateSharedSDS` must be released + * using `VM_ReleaseSharedSDS` to ensure proper memory management. + * + * Parameters: + * - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object to be released. + */ +void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds) { + sdsfree(shared_sds->buf); +} + /* MODULE command. * * MODULE LIST @@ -14130,4 +14215,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(RegisterScriptingEngine); REGISTER_API(UnregisterScriptingEngine); REGISTER_API(GetFunctionExecutionState); + REGISTER_API(CreateSharedSDS); + REGISTER_API(SharedSDSPtrLen); + REGISTER_API(ReleaseSharedSDS); } diff --git a/src/sds.c b/src/sds.c index 2f40c9dc9c..95222d915c 100644 --- a/src/sds.c +++ b/src/sds.c @@ -48,6 +48,7 @@ int sdsHdrSize(char type) { case SDS_TYPE_16: return sizeof(struct sdshdr16); case SDS_TYPE_32: return sizeof(struct sdshdr32); case SDS_TYPE_64: return sizeof(struct sdshdr64); + case SDS_TYPE_32_SHARED: return sizeof(struct sdshdr32shared); } return 0; } @@ -71,14 +72,15 @@ static inline size_t sdsTypeMaxSize(char type) { if (type == SDS_TYPE_8) return (1 << 8) - 1; if (type == SDS_TYPE_16) return (1 << 16) - 1; #if (LONG_MAX == LLONG_MAX) - if (type == SDS_TYPE_32) return (1ll << 32) - 1; + if (type == SDS_TYPE_32 || type == SDS_TYPE_32_SHARED) return (1ll << 32) - 1; #endif return -1; /* this is equivalent to the max SDS_TYPE_64 or SDS_TYPE_32 */ } static inline int adjustTypeIfNeeded(char *type, int *hdrlen, size_t bufsize) { size_t usable = bufsize - *hdrlen - 1; - if (*type != SDS_TYPE_5 && usable > sdsTypeMaxSize(*type)) { + if (*type != SDS_TYPE_5 && *type != SDS_TYPE_32_SHARED && + usable > sdsTypeMaxSize(*type)) { *type = sdsReqType(usable); *hdrlen = sdsHdrSize(*type); return 1; @@ -124,6 +126,7 @@ sds _sdsnewlen(const void *init, size_t initlen, int trymalloc) { * can't be greater than `sdsTypeMaxSize(type)`. */ sds sdswrite(char *buf, size_t bufsize, char type, const char *init, size_t initlen) { assert(bufsize >= sdsReqSize(initlen, type)); + assert(type != SDS_TYPE_32_SHARED); int hdrlen = sdsHdrSize(type); size_t usable = bufsize - hdrlen - 1; sds s = buf + hdrlen; @@ -204,6 +207,15 @@ sds sdsdup(const_sds s) { /* Free an sds string. No operation is performed if 's' is NULL. */ void sdsfree(sds s) { if (s == NULL) return; + if (sdsType(s) == SDS_TYPE_32_SHARED) { + SDS_HDR_VAR(32shared, s); + if (atomic_fetch_sub_explicit(&sh->refcount, 1, memory_order_acq_rel) > 1) { + return; + } + sh->freecbfn(sh, sdsAllocSize(s)); + s_free_with_size(s + sh->len + 1 - sdsAllocSize(s), sdsAllocSize(s)); + return; + } s_free_with_size(sdsAllocPtr(s), sdsAllocSize(s)); } @@ -266,6 +278,7 @@ sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) { /* Return ASAP if there is enough space left. */ if (avail >= addlen) return s; + assert(oldtype != SDS_TYPE_32_SHARED); len = sdslen(s); sh = (char *)s - sdsHdrSize(oldtype); reqlen = newlen = (len + addlen); @@ -351,6 +364,7 @@ sds sdsResize(sds s, size_t size, int would_regrow) { /* Return ASAP if the size is already good. */ if (sdsalloc(s) == size) return s; + assert(oldtype != SDS_TYPE_32_SHARED); /* Truncate len if needed. */ if (size < len) len = size; @@ -439,6 +453,44 @@ void *sdsAllocPtr(const_sds s) { return (void *)(s - sdsHdrSize(sdsType(s))); } +/* Initialize a shared sds into a buffer `buf`. + * + * Parameters: + * - `buf`: A pointer to the allocated buffer that is initialized a the shared sds. + * - `len`: The length of the sds buffer. + * - `alloc`: The total allocated size of the buffer, including metadata. + * - `freecbfn`: A callback function that is invoked when the sds object is released. + * + * Returns: + * - A pointer to the initialized `sdshdr32shared` structure. + * + * Notes: + * - The caller is responsible for ensuring that `buf` is large enough to accommodate + * the SDS metadata, the string content, and the null terminator. + * - The `freecbfn` does not directly free memory but is used primarily for + * tracking deallocation events. + */ +sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn) { + buf[alloc - 1] = 0; + sds s = buf + alloc - len - 1; + SDS_HDR_VAR(32shared, s); + sh->freecbfn = freecbfn; + atomic_init(&sh->refcount, 1); + sh->len = len; + sh->alloc = alloc - sdsHdrSize(SDS_TYPE_32_SHARED) - 1; + sh->flags = SDS_TYPE_32_SHARED; + return sh; +} + +/* Increases the reference count of a shared SDS object. + * + * Parameters: + * - `sh`: A pointer to the `sdshdr32shared` structure whose reference count + * should be increased. + */ +void sdsRetain(sdshdr32shared *sh) { + atomic_fetch_add_explicit(&sh->refcount, 1, memory_order_relaxed); +} /* Increment the sds length and decrements the left free space at the * end of the string according to 'incr'. Also set the null term * in the new end of the string. @@ -498,6 +550,13 @@ void sdsIncrLen(sds s, ssize_t incr) { len = (sh->len += incr); break; } + case SDS_TYPE_32_SHARED: { + SDS_HDR_VAR(32shared, s); + assert((incr >= 0 && sh->alloc - sh->len >= (unsigned int)incr) || + (incr < 0 && sh->len >= (unsigned int)(-incr))); + len = (sh->len += incr); + break; + } default: len = 0; /* Just to avoid compilation warnings. */ } s[len] = '\0'; @@ -779,6 +838,7 @@ sds sdstrim(sds s, const char *cset) { char *end, *sp, *ep; size_t len; + assert(sdsType(s) != SDS_TYPE_32_SHARED); sp = s; ep = end = s + sdslen(s) - 1; while (sp <= end && strchr(cset, *sp)) sp++; diff --git a/src/sds.h b/src/sds.h index 09102d2567..e69bdc5ca4 100644 --- a/src/sds.h +++ b/src/sds.h @@ -38,6 +38,7 @@ extern const char *SDS_NOINIT; #include #include #include +#include /* Constness: * @@ -48,6 +49,8 @@ extern const char *SDS_NOINIT; typedef char *sds; typedef const char *const_sds; +typedef void (*sharedSdsFreeCB)(void *, size_t); + /* Note: sdshdr5 is never used, we just access the flags byte directly. * However is here to document the layout of type 5 SDS strings. */ struct __attribute__((__packed__)) sdshdr5 { @@ -72,6 +75,14 @@ struct __attribute__((__packed__)) sdshdr32 { unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; }; +typedef struct __attribute__((__packed__)) sdshdr32shared { + atomic_int refcount __attribute__((aligned(4))); + sharedSdsFreeCB freecbfn; + uint32_t len; /* used */ + uint32_t alloc; /* excluding the header and null terminator */ + unsigned char flags; /* 3 lsb of type, 5 unused bits */ + char buf[]; +} sdshdr32shared; struct __attribute__((__packed__)) sdshdr64 { uint64_t len; /* used */ uint64_t alloc; /* excluding the header and null terminator */ @@ -84,6 +95,7 @@ struct __attribute__((__packed__)) sdshdr64 { #define SDS_TYPE_16 2 #define SDS_TYPE_32 3 #define SDS_TYPE_64 4 +#define SDS_TYPE_32_SHARED 5 #define SDS_TYPE_MASK 7 #define SDS_TYPE_BITS 3 #define SDS_HDR_VAR(T, s) struct sdshdr##T *sh = (void *)((s) - (sizeof(struct sdshdr##T))); @@ -124,6 +136,7 @@ static inline size_t sdslen(const_sds s) { case SDS_TYPE_16: return SDS_HDR(16, s)->len; case SDS_TYPE_32: return SDS_HDR(32, s)->len; case SDS_TYPE_64: return SDS_HDR(64, s)->len; + case SDS_TYPE_32_SHARED: return SDS_HDR(32shared, s)->len; } return 0; } @@ -150,6 +163,10 @@ static inline size_t sdsavail(const_sds s) { SDS_HDR_VAR(64, s); return sh->alloc - sh->len; } + case SDS_TYPE_32_SHARED: { + SDS_HDR_VAR(32shared, s); + return sh->alloc - sh->len; + } } return 0; } @@ -165,6 +182,7 @@ static inline void sdssetlen(sds s, size_t newlen) { case SDS_TYPE_16: SDS_HDR(16, s)->len = newlen; break; case SDS_TYPE_32: SDS_HDR(32, s)->len = newlen; break; case SDS_TYPE_64: SDS_HDR(64, s)->len = newlen; break; + case SDS_TYPE_32_SHARED: SDS_HDR(32shared, s)->len = newlen; break; } } @@ -180,6 +198,7 @@ static inline void sdsinclen(sds s, size_t inc) { case SDS_TYPE_16: SDS_HDR(16, s)->len += inc; break; case SDS_TYPE_32: SDS_HDR(32, s)->len += inc; break; case SDS_TYPE_64: SDS_HDR(64, s)->len += inc; break; + case SDS_TYPE_32_SHARED: SDS_HDR(32shared, s)->len += inc; break; } } @@ -192,6 +211,7 @@ static inline size_t sdsalloc(const_sds s) { case SDS_TYPE_16: return SDS_HDR(16, s)->alloc; case SDS_TYPE_32: return SDS_HDR(32, s)->alloc; case SDS_TYPE_64: return SDS_HDR(64, s)->alloc; + case SDS_TYPE_32_SHARED: return SDS_HDR(32shared, s)->alloc; } return 0; } @@ -206,6 +226,7 @@ static inline void sdssetalloc(sds s, size_t newlen) { case SDS_TYPE_16: SDS_HDR(16, s)->alloc = newlen; break; case SDS_TYPE_32: SDS_HDR(32, s)->alloc = newlen; break; case SDS_TYPE_64: SDS_HDR(64, s)->alloc = newlen; break; + case SDS_TYPE_32_SHARED: SDS_HDR(32shared, s)->alloc = newlen; break; } } @@ -268,6 +289,8 @@ sds sdsRemoveFreeSpace(sds s, int would_regrow); sds sdsResize(sds s, size_t size, int would_regrow); size_t sdsAllocSize(const_sds s); void *sdsAllocPtr(const_sds s); +sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn); +void sdsRetain(sdshdr32shared *sh); /* Returns the minimum required size to store an sds string of the given length * and type. */ diff --git a/src/server.h b/src/server.h index 42856e4e57..f574cf4a2f 100644 --- a/src/server.h +++ b/src/server.h @@ -3275,6 +3275,7 @@ sds hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what); sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what); robj *hashTypeLookupWriteOrCreate(client *c, robj *key); robj *hashTypeGetValueObject(robj *o, sds field); +sds hashTypeGetFromHashTable(robj *o, sds field); int hashTypeSet(robj *o, sds field, sds value, int flags); robj *hashTypeDup(robj *o); diff --git a/src/t_hash.c b/src/t_hash.c index 6b6ab9bc19..5c9c3ab300 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -97,7 +97,7 @@ hashTypeEntry *hashTypeCreateEntry(sds field, sds value) { size_t value_len = sdslen(value); size_t value_size = sdsReqSize(value_len, SDS_TYPE_8); sds embedded_field_sds; - if (field_size + value_size <= EMBED_VALUE_MAX_ALLOC_SIZE) { + if (sdsType(value) != SDS_TYPE_32_SHARED && field_size + value_size <= EMBED_VALUE_MAX_ALLOC_SIZE) { /* Embed field and value. Value is fixed to SDS_TYPE_8. Unused * allocation space is recorded in the embedded value's SDS header. * @@ -134,6 +134,10 @@ hashTypeEntry *hashTypeCreateEntry(sds field, sds value) { /* Store the entry encoding type in sds aux bits. */ sdsSetAuxBit(embedded_field_sds, FIELD_SDS_AUX_BIT_ENTRY_HAS_VALUE_PTR, 1); serverAssert(entryHasValuePtr(embedded_field_sds)); + if (sdsType(value) == SDS_TYPE_32_SHARED) { + SDS_HDR_VAR(32shared, value); + sdsRetain(sh); + } } return (void *)embedded_field_sds; } @@ -175,7 +179,7 @@ static hashTypeEntry *hashTypeEntryReplaceValue(hashTypeEntry *entry, sds value) char *alloc_ptr = sdsAllocPtr(entry); size_t required_size = field_size + value_size; size_t alloc_size; - if (required_size <= EMBED_VALUE_MAX_ALLOC_SIZE && + if (sdsType(value) != SDS_TYPE_32_SHARED && required_size <= EMBED_VALUE_MAX_ALLOC_SIZE && required_size <= (alloc_size = hashTypeEntryMemUsage(entry)) && required_size >= alloc_size * 3 / 4) { /* It fits in the allocation and leaves max 25% unused space. */ @@ -188,7 +192,7 @@ static hashTypeEntry *hashTypeEntryReplaceValue(hashTypeEntry *entry, sds value) return new_entry; } else { /* The value pointer is located before the embedded field. */ - if (field_size + value_size <= EMBED_VALUE_MAX_ALLOC_SIZE) { + if (sdsType(value) != SDS_TYPE_32_SHARED && field_size + value_size <= EMBED_VALUE_MAX_ALLOC_SIZE) { /* Convert to entry with embedded value. */ hashTypeEntry *new_entry = hashTypeCreateEntry(field, value); freeHashTypeEntry(entry); @@ -198,6 +202,10 @@ static hashTypeEntry *hashTypeEntryReplaceValue(hashTypeEntry *entry, sds value) sds *value_ref = hashTypeEntryGetValueRef(entry); sdsfree(*value_ref); *value_ref = value; + if (sdsType(value) == SDS_TYPE_32_SHARED) { + SDS_HDR_VAR(32shared, value); + sdsRetain(sh); + } return entry; } } @@ -232,8 +240,10 @@ size_t hashTypeEntryMemUsage(hashTypeEntry *entry) { hashTypeEntry *hashTypeEntryDefrag(hashTypeEntry *entry, void *(*defragfn)(void *), sds (*sdsdefragfn)(sds)) { if (entryHasValuePtr(entry)) { sds *value_ref = hashTypeEntryGetValueRef(entry); - sds new_value = sdsdefragfn(*value_ref); - if (new_value) *value_ref = new_value; + if (sdsType(*value_ref) != SDS_TYPE_32_SHARED) { + sds new_value = sdsdefragfn(*value_ref); + if (new_value) *value_ref = new_value; + } } char *allocation = hashTypeEntryAllocPtr(entry); char *new_allocation = defragfn(allocation); @@ -461,7 +471,7 @@ int hashTypeSet(robj *o, sds field, sds value, int flags) { hashtable *ht = o->ptr; sds v; - if (flags & HASH_SET_TAKE_VALUE) { + if (flags & HASH_SET_TAKE_VALUE || sdsType(value) == SDS_TYPE_32_SHARED) { v = value; value = NULL; } else { diff --git a/src/valkeymodule.h b/src/valkeymodule.h index 070b526e39..6b4becd4ab 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -136,6 +136,7 @@ typedef long long ustime_t; #define VALKEYMODULE_HASH_CFIELDS (1 << 2) #define VALKEYMODULE_HASH_EXISTS (1 << 3) #define VALKEYMODULE_HASH_COUNT_ALL (1 << 4) +#define VALKEYMODULE_HASH_SHAREBLE_VALUES (1 << 5) #define VALKEYMODULE_CONFIG_DEFAULT 0 /* This is the default for a module config. */ #define VALKEYMODULE_CONFIG_IMMUTABLE (1ULL << 0) /* Can this value only be set at startup? */ @@ -788,6 +789,10 @@ typedef enum { } ValkeyModuleACLLogEntryReason; /* Incomplete structures needed by both the core and modules. */ +struct sdshdr32shared; +#define ValkeyModuleSharedSDS sdshdr32shared +#define ValkeyModuleSharedSDSFreeCBFunc sharedSdsFreeCB +typedef struct sdshdr32shared sdshdr32shared; typedef struct ValkeyModuleCtx ValkeyModuleCtx; typedef struct ValkeyModuleIO ValkeyModuleIO; typedef struct ValkeyModuleDigest ValkeyModuleDigest; @@ -796,6 +801,7 @@ typedef struct ValkeyModuleDefragCtx ValkeyModuleDefragCtx; /* Function pointers needed by both the core and modules, these needs to be * exposed since you can't cast a function pointer to (void *). */ +typedef void (*sharedSdsFreeCB)(void *, size_t); typedef void (*ValkeyModuleInfoFunc)(ValkeyModuleInfoCtx *ctx, int for_crash_report); typedef void (*ValkeyModuleDefragFunc)(ValkeyModuleDefragCtx *ctx); typedef void (*ValkeyModuleUserChangedFunc)(uint64_t client_id, void *privdata); @@ -1119,6 +1125,7 @@ typedef int (*ValkeyModuleAuthCallback)(ValkeyModuleCtx *ctx, ValkeyModuleString *username, ValkeyModuleString *password, ValkeyModuleString **err); +typedef void *(*ValkeyModuleSharedSDSAllocFunc)(size_t len, size_t *alloc); typedef struct ValkeyModuleTypeMethods { uint64_t version; @@ -1881,6 +1888,11 @@ VALKEYMODULE_API int (*ValkeyModule_UnregisterScriptingEngine)(ValkeyModuleCtx * const char *engine_name) VALKEYMODULE_ATTR; VALKEYMODULE_API ValkeyModuleScriptingEngineExecutionState (*ValkeyModule_GetFunctionExecutionState)(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) VALKEYMODULE_ATTR; +VALKEYMODULE_API ValkeyModuleSharedSDS *(*ValkeyModule_CreateSharedSDS)(size_t len, + ValkeyModuleSharedSDSAllocFunc allocfn, + ValkeyModuleSharedSDSFreeCBFunc freecbfn) VALKEYMODULE_ATTR; +VALKEYMODULE_API char *(*ValkeyModule_SharedSDSPtrLen)(ValkeyModuleSharedSDS *shared_sds, size_t *len) VALKEYMODULE_ATTR; +VALKEYMODULE_API void (*ValkeyModule_ReleaseSharedSDS)(ValkeyModuleSharedSDS *shared_sds) VALKEYMODULE_ATTR; #define ValkeyModule_IsAOFClient(id) ((id) == UINT64_MAX) @@ -2253,6 +2265,9 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in VALKEYMODULE_GET_API(RegisterScriptingEngine); VALKEYMODULE_GET_API(UnregisterScriptingEngine); VALKEYMODULE_GET_API(GetFunctionExecutionState); + VALKEYMODULE_GET_API(CreateSharedSDS); + VALKEYMODULE_GET_API(SharedSDSPtrLen); + VALKEYMODULE_GET_API(ReleaseSharedSDS); if (ValkeyModule_IsModuleNameBusy && ValkeyModule_IsModuleNameBusy(name)) return VALKEYMODULE_ERR; ValkeyModule_SetModuleAttribs(ctx, name, ver, apiver); From 47a948777f672b9404b0147edf5ae71aa54cda53 Mon Sep 17 00:00:00 2001 From: yairgott Date: Mon, 3 Mar 2025 23:20:37 -0800 Subject: [PATCH 2/2] adding TCL Signed-off-by: yairgott --- src/module.c | 23 +++++++----- src/sds.c | 33 +++++++++++------ src/sds.h | 15 ++++---- src/t_zset.c | 2 +- src/valkeymodule.h | 4 +- tests/modules/hash.c | 70 ++++++++++++++++++++++++++++------- tests/unit/moduleapi/hash.tcl | 8 +++- 7 files changed, 109 insertions(+), 46 deletions(-) diff --git a/src/module.c b/src/module.c index a0812eb178..0173104c00 100644 --- a/src/module.c +++ b/src/module.c @@ -517,7 +517,7 @@ static void zsetKeyReset(ValkeyModuleKey *key); static void moduleInitKeyTypeSpecific(ValkeyModuleKey *key); void VM_FreeDict(ValkeyModuleCtx *ctx, ValkeyModuleDict *d); void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data); -void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds); +void VM_ReleaseSharedSDS(ValkeyModuleCtx *ctx, ValkeyModuleSharedSDS *shared_sds); /* Helpers for VM_SetCommandInfo. */ static int moduleValidateCommandInfo(const ValkeyModuleCommandInfo *info); @@ -2685,7 +2685,7 @@ void autoMemoryCollect(ValkeyModuleCtx *ctx) { case VALKEYMODULE_AM_KEY: VM_CloseKey(ptr); break; case VALKEYMODULE_AM_DICT: VM_FreeDict(NULL, ptr); break; case VALKEYMODULE_AM_INFO: VM_FreeServerInfo(NULL, ptr); break; - case VALKEYMODULE_AM_SHARED_SDS: VM_ReleaseSharedSDS(ptr); break; + case VALKEYMODULE_AM_SHARED_SDS: VM_ReleaseSharedSDS(NULL, ptr); break; } } ctx->flags |= VALKEYMODULE_CTX_AUTO_MEMORY; @@ -5462,9 +5462,9 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) { if (key->value->encoding == OBJ_ENCODING_HASHTABLE) { sds value_sds = hashTypeGetFromHashTable(key->value, field->ptr); if (value_sds && sdsType(value_sds) == SDS_TYPE_32_SHARED) { - *valueptr = (ValkeyModuleSharedSDS *)(value_sds - sdsHdrSize(sdsType(value_sds))); + *valueptr = (ValkeyModuleSharedSDS *)sdsAllocPtr(value_sds); sdsRetain(*valueptr); - autoMemoryAdd(key->ctx, VALKEYMODULE_AM_SHARED_SDS, *valueptr); + if (key->ctx != NULL) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_SHARED_SDS, *valueptr); } } } else { @@ -13309,10 +13309,12 @@ ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState( * Returns: * - A pointer to the created shared SDS object. */ -ValkeyModuleSharedSDS *VM_CreateSharedSDS(size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) { +ValkeyModuleSharedSDS *VM_CreateSharedSDS(ValkeyModuleCtx *ctx, size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) { size_t alloc; - void *buf = allocfn(len + sizeof(ValkeyModuleSharedSDS) + 1, &alloc); - return sdsInitShared((char *)buf, len, alloc, freecbfn); + void *buf = allocfn(len + offsetof(sdshdr32shared, buf) + 1, &alloc); + ValkeyModuleSharedSDS *shared_sds = sdsInitShared((char *)buf, len, alloc, freecbfn); + if (ctx != NULL) autoMemoryAdd(ctx, VALKEYMODULE_AM_SHARED_SDS, shared_sds); + return shared_sds; } /* Retrieves the pointer to the shared SDS buffer along with its length. @@ -13326,7 +13328,7 @@ ValkeyModuleSharedSDS *VM_CreateSharedSDS(size_t len, ValkeyModuleSharedSDSAlloc */ char *VM_SharedSDSPtrLen(ValkeyModuleSharedSDS *shared_sds, size_t *len) { *len = shared_sds->len; - return (char *)shared_sds + sizeof(ValkeyModuleSharedSDS); + return (char *)shared_sds + offsetof(sdshdr32shared, buf); } /* Releases a shared SDS object by decrementing its intrusive reference count. @@ -13337,8 +13339,9 @@ char *VM_SharedSDSPtrLen(ValkeyModuleSharedSDS *shared_sds, size_t *len) { * Parameters: * - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object to be released. */ -void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds) { - sdsfree(shared_sds->buf); +void VM_ReleaseSharedSDS(ValkeyModuleCtx *ctx, ValkeyModuleSharedSDS *shared_sds) { + int res = sdsReleaseShared(shared_sds); + if (ctx != NULL && res) autoMemoryFreed(ctx, VALKEYMODULE_AM_SHARED_SDS, shared_sds); } /* MODULE command. diff --git a/src/sds.c b/src/sds.c index 95222d915c..7740f87b32 100644 --- a/src/sds.c +++ b/src/sds.c @@ -209,15 +209,22 @@ void sdsfree(sds s) { if (s == NULL) return; if (sdsType(s) == SDS_TYPE_32_SHARED) { SDS_HDR_VAR(32shared, s); - if (atomic_fetch_sub_explicit(&sh->refcount, 1, memory_order_acq_rel) > 1) { - return; - } - sh->freecbfn(sh, sdsAllocSize(s)); - s_free_with_size(s + sh->len + 1 - sdsAllocSize(s), sdsAllocSize(s)); + sdsReleaseShared(sh); return; } s_free_with_size(sdsAllocPtr(s), sdsAllocSize(s)); } +/* return 1 if the shared sds is freed otherwise 0 */ +int sdsReleaseShared(sdshdrshared *sh) { + if (atomic_fetch_sub_explicit(&sh->refcount, 1, memory_order_acq_rel) > 1) { + return 0; + } + char *ptr = sh->buf + sh->len + 1 - sdsAllocSize(sh->buf); + size_t alloc_size = sdsAllocSize(sh->buf); + sh->freecbfn(ptr, alloc_size); + s_free_with_size(ptr, alloc_size); + return 1; +} /* This variant of sdsfree() gets its argument as void, and is useful * as free method in data structures that expect a 'void free_object(void*)' @@ -450,7 +457,11 @@ size_t sdsAllocSize(const_sds s) { /* Return the pointer of the actual SDS allocation (normally SDS strings * are referenced by the start of the string buffer). */ void *sdsAllocPtr(const_sds s) { - return (void *)(s - sdsHdrSize(sdsType(s))); + char type = sdsType(s); + if (type == SDS_TYPE_32_SHARED) { + return (void *)(s - offsetof(sdshdrshared, buf)); + } + return (void *)(s - sdsHdrSize(type)); } /* Initialize a shared sds into a buffer `buf`. @@ -462,7 +473,7 @@ void *sdsAllocPtr(const_sds s) { * - `freecbfn`: A callback function that is invoked when the sds object is released. * * Returns: - * - A pointer to the initialized `sdshdr32shared` structure. + * - A pointer to the initialized `sdshdrshared` structure. * * Notes: * - The caller is responsible for ensuring that `buf` is large enough to accommodate @@ -470,13 +481,13 @@ void *sdsAllocPtr(const_sds s) { * - The `freecbfn` does not directly free memory but is used primarily for * tracking deallocation events. */ -sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn) { +sdshdrshared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn) { buf[alloc - 1] = 0; sds s = buf + alloc - len - 1; SDS_HDR_VAR(32shared, s); sh->freecbfn = freecbfn; atomic_init(&sh->refcount, 1); - sh->len = len; + sh->len = (uint32_t)len; sh->alloc = alloc - sdsHdrSize(SDS_TYPE_32_SHARED) - 1; sh->flags = SDS_TYPE_32_SHARED; return sh; @@ -485,10 +496,10 @@ sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFree /* Increases the reference count of a shared SDS object. * * Parameters: - * - `sh`: A pointer to the `sdshdr32shared` structure whose reference count + * - `sh`: A pointer to the `sdshdrshared` structure whose reference count * should be increased. */ -void sdsRetain(sdshdr32shared *sh) { +void sdsRetain(sdshdrshared *sh) { atomic_fetch_add_explicit(&sh->refcount, 1, memory_order_relaxed); } /* Increment the sds length and decrements the left free space at the diff --git a/src/sds.h b/src/sds.h index e69bdc5ca4..0edf0d34b5 100644 --- a/src/sds.h +++ b/src/sds.h @@ -39,6 +39,7 @@ extern const char *SDS_NOINIT; #include #include #include +#include /* Constness: * @@ -82,7 +83,7 @@ typedef struct __attribute__((__packed__)) sdshdr32shared { uint32_t alloc; /* excluding the header and null terminator */ unsigned char flags; /* 3 lsb of type, 5 unused bits */ char buf[]; -} sdshdr32shared; +} sdshdrshared; struct __attribute__((__packed__)) sdshdr64 { uint64_t len; /* used */ uint64_t alloc; /* excluding the header and null terminator */ @@ -98,8 +99,8 @@ struct __attribute__((__packed__)) sdshdr64 { #define SDS_TYPE_32_SHARED 5 #define SDS_TYPE_MASK 7 #define SDS_TYPE_BITS 3 -#define SDS_HDR_VAR(T, s) struct sdshdr##T *sh = (void *)((s) - (sizeof(struct sdshdr##T))); -#define SDS_HDR(T, s) ((struct sdshdr##T *)((s) - (sizeof(struct sdshdr##T)))) +#define SDS_HDR_VAR(T, s) struct sdshdr##T *sh = (void *)((s) - (offsetof(struct sdshdr##T, buf))); +#define SDS_HDR(T, s) ((struct sdshdr##T *)((s) - (offsetof(struct sdshdr##T, buf)))) #define SDS_TYPE_5_LEN(f) ((unsigned char)(f) >> SDS_TYPE_BITS) static inline unsigned char sdsType(const_sds s) { @@ -164,8 +165,7 @@ static inline size_t sdsavail(const_sds s) { return sh->alloc - sh->len; } case SDS_TYPE_32_SHARED: { - SDS_HDR_VAR(32shared, s); - return sh->alloc - sh->len; + return 0; } } return 0; @@ -289,8 +289,9 @@ sds sdsRemoveFreeSpace(sds s, int would_regrow); sds sdsResize(sds s, size_t size, int would_regrow); size_t sdsAllocSize(const_sds s); void *sdsAllocPtr(const_sds s); -sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn); -void sdsRetain(sdshdr32shared *sh); +sdshdrshared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn); +void sdsRetain(sdshdrshared *sh); +int sdsReleaseShared(sdshdrshared *sh); /* Returns the minimum required size to store an sds string of the given length * and type. */ diff --git a/src/t_zset.c b/src/t_zset.c index d73faa8b0c..839b8a7be0 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -4235,7 +4235,7 @@ void zrandmemberWithCountCommand(client *c, long l, int withscores) { while (added < count) { listpackEntry key; - double score; + double score = 0; zsetTypeRandomElement(zsetobj, size, &key, withscores ? &score : NULL); /* Try to add the object to the dictionary. If it already exists diff --git a/src/valkeymodule.h b/src/valkeymodule.h index 6b4becd4ab..1ca1df8816 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -1888,11 +1888,11 @@ VALKEYMODULE_API int (*ValkeyModule_UnregisterScriptingEngine)(ValkeyModuleCtx * const char *engine_name) VALKEYMODULE_ATTR; VALKEYMODULE_API ValkeyModuleScriptingEngineExecutionState (*ValkeyModule_GetFunctionExecutionState)(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) VALKEYMODULE_ATTR; -VALKEYMODULE_API ValkeyModuleSharedSDS *(*ValkeyModule_CreateSharedSDS)(size_t len, +VALKEYMODULE_API ValkeyModuleSharedSDS *(*ValkeyModule_CreateSharedSDS)(ValkeyModuleCtx *ctx, size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) VALKEYMODULE_ATTR; VALKEYMODULE_API char *(*ValkeyModule_SharedSDSPtrLen)(ValkeyModuleSharedSDS *shared_sds, size_t *len) VALKEYMODULE_ATTR; -VALKEYMODULE_API void (*ValkeyModule_ReleaseSharedSDS)(ValkeyModuleSharedSDS *shared_sds) VALKEYMODULE_ATTR; +VALKEYMODULE_API void (*ValkeyModule_ReleaseSharedSDS)(ValkeyModuleCtx *ctx, ValkeyModuleSharedSDS *shared_sds) VALKEYMODULE_ATTR; #define ValkeyModule_IsAOFClient(id) ((id) == UINT64_MAX) diff --git a/tests/modules/hash.c b/tests/modules/hash.c index 4bee4d2a98..acf7a8152a 100644 --- a/tests/modules/hash.c +++ b/tests/modules/hash.c @@ -2,11 +2,52 @@ #include #include #include +#include + +#define UNUSED(V) ((void) V) + +int shared_sds_cnt = 0; +ValkeyModuleSharedSDS *shared_sds_arr[4]; + +void *shared_sds_alloc(size_t len, size_t *alloc) { + *alloc = 2 * len; + return malloc(*alloc * sizeof(char)); +} + +void shared_sds_free_cb(void *ptr, size_t size) { + UNUSED(size); + UNUSED(ptr); +} + +void check_hash_get_shared_sds(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int result) { + if (result == 0) return; + ValkeyModuleKey *key = ValkeyModule_OpenKey(ctx, argv[1], VALKEYMODULE_READ); + for (int i = 0; i < shared_sds_cnt; ++i) { + ValkeyModuleSharedSDS *get_value = NULL; + ValkeyModule_HashGet(key, VALKEYMODULE_HASH_SHAREBLE_VALUES, + argv[i*2 + 3], &get_value, NULL); + if (get_value) { + ValkeyModule_Assert(shared_sds_arr[i] == get_value); + } + } +} /* If a string is ":deleted:", the special value for deleted hash fields is * returned; otherwise the input string is returned. */ -static ValkeyModuleString *value_or_delete(ValkeyModuleString *s) { - if (!strcasecmp(ValkeyModule_StringPtrLen(s, NULL), ":delete:")) +static void *value_or_delete(ValkeyModuleCtx *ctx, ValkeyModuleString *s, int flags) { + size_t str_len; + const char *str = ValkeyModule_StringPtrLen(s, &str_len); + if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) { + ValkeyModuleSharedSDS *shared_sds = ValkeyModule_CreateSharedSDS(ctx, str_len, shared_sds_alloc, shared_sds_free_cb); + size_t sds_len; + char *sds_str = ValkeyModule_SharedSDSPtrLen(shared_sds, &sds_len); + ValkeyModule_Assert(sds_len == str_len); + memcpy(sds_str, str, str_len); + shared_sds_arr[shared_sds_cnt++] = shared_sds; + return shared_sds; + } + shared_sds_arr[shared_sds_cnt++] = NULL; + if (!strcasecmp(str, ":delete:")) return VALKEYMODULE_HASH_DELETE; else return s; @@ -22,6 +63,7 @@ int hash_set(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { if (argc < 5 || argc % 2 == 0 || argc > 11) return ValkeyModule_WrongArity(ctx); + shared_sds_cnt = 0; ValkeyModule_AutoMemory(ctx); ValkeyModuleKey *key = ValkeyModule_OpenKey(ctx, argv[1], VALKEYMODULE_WRITE); @@ -33,6 +75,7 @@ int hash_set(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { case 'n': flags |= VALKEYMODULE_HASH_NX; break; case 'x': flags |= VALKEYMODULE_HASH_XX; break; case 'a': flags |= VALKEYMODULE_HASH_COUNT_ALL; break; + case 's': flags |= VALKEYMODULE_HASH_SHAREBLE_VALUES; break; } } @@ -41,30 +84,29 @@ int hash_set(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { errno = 0; if (argc == 5) { result = ValkeyModule_HashSet(key, flags, - argv[3], value_or_delete(argv[4]), + argv[3], value_or_delete(ctx, argv[4], flags), NULL); } else if (argc == 7) { result = ValkeyModule_HashSet(key, flags, - argv[3], value_or_delete(argv[4]), - argv[5], value_or_delete(argv[6]), + argv[3], value_or_delete(ctx, argv[4], flags), + argv[5], value_or_delete(ctx, argv[6], flags), NULL); } else if (argc == 9) { result = ValkeyModule_HashSet(key, flags, - argv[3], value_or_delete(argv[4]), - argv[5], value_or_delete(argv[6]), - argv[7], value_or_delete(argv[8]), + argv[3], value_or_delete(ctx, argv[4], flags), + argv[5], value_or_delete(ctx, argv[6], flags), + argv[7], value_or_delete(ctx, argv[8], flags), NULL); } else if (argc == 11) { result = ValkeyModule_HashSet(key, flags, - argv[3], value_or_delete(argv[4]), - argv[5], value_or_delete(argv[6]), - argv[7], value_or_delete(argv[8]), - argv[9], value_or_delete(argv[10]), + argv[3], value_or_delete(ctx, argv[4], flags), + argv[5], value_or_delete(ctx, argv[6], flags), + argv[7], value_or_delete(ctx, argv[8], flags), + argv[9], value_or_delete(ctx, argv[10], flags), NULL); } else { return ValkeyModule_ReplyWithError(ctx, "ERR too many fields"); } - /* Check errno */ if (result == 0) { if (errno == ENOTSUP) @@ -72,7 +114,7 @@ int hash_set(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { else ValkeyModule_Assert(errno == ENOENT); } - + check_hash_get_shared_sds(ctx, argv, result); return ValkeyModule_ReplyWithLongLong(ctx, result); } diff --git a/tests/unit/moduleapi/hash.tcl b/tests/unit/moduleapi/hash.tcl index 116b1c5120..2e04f68e1d 100644 --- a/tests/unit/moduleapi/hash.tcl +++ b/tests/unit/moduleapi/hash.tcl @@ -18,8 +18,14 @@ start_server {tags {"modules"}} { assert_equal 0 [r hash.set k "xa" new stuff not inserted] assert_equal 1 [r hash.set k "x" squirrel ofcourse] assert_equal 1 [r hash.set k "" sushi :delete: none :delete:] + assert_equal 1 [r hash.set k "nas" carrot orange] + assert_equal 0 [r hash.set k "nas" carrot pink] + assert_equal 1 [r hash.set k "a" apple red] + assert_equal 1 [r hash.set k "xs" apple green] + assert_equal 1 [r hash.set k "sa" tomato red] + assert_equal 1 [r hash.set k "" tomato :delete:] r hgetall k - } {squirrel ofcourse banana no what nothing something nice} + } {squirrel ofcourse banana no what nothing something nice carrot orange apple green} test "Unload the module - hash" { assert_equal {OK} [r module unload hash]