Skip to content

Commit

Permalink
adding TCL
Browse files Browse the repository at this point in the history
Signed-off-by: yairgott <[email protected]>
  • Loading branch information
yairgott committed Mar 5, 2025
1 parent f9aad1a commit 47a9487
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 46 deletions.
23 changes: 13 additions & 10 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ static void zsetKeyReset(ValkeyModuleKey *key);
static void moduleInitKeyTypeSpecific(ValkeyModuleKey *key);
void VM_FreeDict(ValkeyModuleCtx *ctx, ValkeyModuleDict *d);
void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data);
void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds);
void VM_ReleaseSharedSDS(ValkeyModuleCtx *ctx, ValkeyModuleSharedSDS *shared_sds);

/* Helpers for VM_SetCommandInfo. */
static int moduleValidateCommandInfo(const ValkeyModuleCommandInfo *info);
Expand Down Expand Up @@ -2685,7 +2685,7 @@ void autoMemoryCollect(ValkeyModuleCtx *ctx) {
case VALKEYMODULE_AM_KEY: VM_CloseKey(ptr); break;
case VALKEYMODULE_AM_DICT: VM_FreeDict(NULL, ptr); break;
case VALKEYMODULE_AM_INFO: VM_FreeServerInfo(NULL, ptr); break;
case VALKEYMODULE_AM_SHARED_SDS: VM_ReleaseSharedSDS(ptr); break;
case VALKEYMODULE_AM_SHARED_SDS: VM_ReleaseSharedSDS(NULL, ptr); break;
}
}
ctx->flags |= VALKEYMODULE_CTX_AUTO_MEMORY;
Expand Down Expand Up @@ -5462,9 +5462,9 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) {
if (key->value->encoding == OBJ_ENCODING_HASHTABLE) {
sds value_sds = hashTypeGetFromHashTable(key->value, field->ptr);
if (value_sds && sdsType(value_sds) == SDS_TYPE_32_SHARED) {
*valueptr = (ValkeyModuleSharedSDS *)(value_sds - sdsHdrSize(sdsType(value_sds)));
*valueptr = (ValkeyModuleSharedSDS *)sdsAllocPtr(value_sds);
sdsRetain(*valueptr);
autoMemoryAdd(key->ctx, VALKEYMODULE_AM_SHARED_SDS, *valueptr);
if (key->ctx != NULL) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_SHARED_SDS, *valueptr);
}
}
} else {
Expand Down Expand Up @@ -13309,10 +13309,12 @@ ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState(
* Returns:
* - A pointer to the created shared SDS object.
*/
ValkeyModuleSharedSDS *VM_CreateSharedSDS(size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) {
ValkeyModuleSharedSDS *VM_CreateSharedSDS(ValkeyModuleCtx *ctx, size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) {
size_t alloc;
void *buf = allocfn(len + sizeof(ValkeyModuleSharedSDS) + 1, &alloc);
return sdsInitShared((char *)buf, len, alloc, freecbfn);
void *buf = allocfn(len + offsetof(sdshdr32shared, buf) + 1, &alloc);
ValkeyModuleSharedSDS *shared_sds = sdsInitShared((char *)buf, len, alloc, freecbfn);
if (ctx != NULL) autoMemoryAdd(ctx, VALKEYMODULE_AM_SHARED_SDS, shared_sds);
return shared_sds;
}

/* Retrieves the pointer to the shared SDS buffer along with its length.
Expand All @@ -13326,7 +13328,7 @@ ValkeyModuleSharedSDS *VM_CreateSharedSDS(size_t len, ValkeyModuleSharedSDSAlloc
*/
char *VM_SharedSDSPtrLen(ValkeyModuleSharedSDS *shared_sds, size_t *len) {
*len = shared_sds->len;
return (char *)shared_sds + sizeof(ValkeyModuleSharedSDS);
return (char *)shared_sds + offsetof(sdshdr32shared, buf);
}

/* Releases a shared SDS object by decrementing its intrusive reference count.
Expand All @@ -13337,8 +13339,9 @@ char *VM_SharedSDSPtrLen(ValkeyModuleSharedSDS *shared_sds, size_t *len) {
* Parameters:
* - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object to be released.
*/
void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds) {
sdsfree(shared_sds->buf);
void VM_ReleaseSharedSDS(ValkeyModuleCtx *ctx, ValkeyModuleSharedSDS *shared_sds) {
int res = sdsReleaseShared(shared_sds);
if (ctx != NULL && res) autoMemoryFreed(ctx, VALKEYMODULE_AM_SHARED_SDS, shared_sds);
}

/* MODULE command.
Expand Down
33 changes: 22 additions & 11 deletions src/sds.c
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,22 @@ void sdsfree(sds s) {
if (s == NULL) return;
if (sdsType(s) == SDS_TYPE_32_SHARED) {
SDS_HDR_VAR(32shared, s);
if (atomic_fetch_sub_explicit(&sh->refcount, 1, memory_order_acq_rel) > 1) {
return;
}
sh->freecbfn(sh, sdsAllocSize(s));
s_free_with_size(s + sh->len + 1 - sdsAllocSize(s), sdsAllocSize(s));
sdsReleaseShared(sh);
return;
}
s_free_with_size(sdsAllocPtr(s), sdsAllocSize(s));
}
/* return 1 if the shared sds is freed otherwise 0 */
int sdsReleaseShared(sdshdrshared *sh) {
if (atomic_fetch_sub_explicit(&sh->refcount, 1, memory_order_acq_rel) > 1) {
return 0;
}
char *ptr = sh->buf + sh->len + 1 - sdsAllocSize(sh->buf);
size_t alloc_size = sdsAllocSize(sh->buf);
sh->freecbfn(ptr, alloc_size);
s_free_with_size(ptr, alloc_size);
return 1;
}

/* This variant of sdsfree() gets its argument as void, and is useful
* as free method in data structures that expect a 'void free_object(void*)'
Expand Down Expand Up @@ -450,7 +457,11 @@ size_t sdsAllocSize(const_sds s) {
/* Return the pointer of the actual SDS allocation (normally SDS strings
* are referenced by the start of the string buffer). */
void *sdsAllocPtr(const_sds s) {
return (void *)(s - sdsHdrSize(sdsType(s)));
char type = sdsType(s);
if (type == SDS_TYPE_32_SHARED) {
return (void *)(s - offsetof(sdshdrshared, buf));
}
return (void *)(s - sdsHdrSize(type));
}

/* Initialize a shared sds into a buffer `buf`.
Expand All @@ -462,21 +473,21 @@ void *sdsAllocPtr(const_sds s) {
* - `freecbfn`: A callback function that is invoked when the sds object is released.
*
* Returns:
* - A pointer to the initialized `sdshdr32shared` structure.
* - A pointer to the initialized `sdshdrshared` structure.
*
* Notes:
* - The caller is responsible for ensuring that `buf` is large enough to accommodate
* the SDS metadata, the string content, and the null terminator.
* - The `freecbfn` does not directly free memory but is used primarily for
* tracking deallocation events.
*/
sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn) {
sdshdrshared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn) {
buf[alloc - 1] = 0;
sds s = buf + alloc - len - 1;
SDS_HDR_VAR(32shared, s);
sh->freecbfn = freecbfn;
atomic_init(&sh->refcount, 1);
sh->len = len;
sh->len = (uint32_t)len;
sh->alloc = alloc - sdsHdrSize(SDS_TYPE_32_SHARED) - 1;
sh->flags = SDS_TYPE_32_SHARED;
return sh;
Expand All @@ -485,10 +496,10 @@ sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFree
/* Increases the reference count of a shared SDS object.
*
* Parameters:
* - `sh`: A pointer to the `sdshdr32shared` structure whose reference count
* - `sh`: A pointer to the `sdshdrshared` structure whose reference count
* should be increased.
*/
void sdsRetain(sdshdr32shared *sh) {
void sdsRetain(sdshdrshared *sh) {
atomic_fetch_add_explicit(&sh->refcount, 1, memory_order_relaxed);
}
/* Increment the sds length and decrements the left free space at the
Expand Down
15 changes: 8 additions & 7 deletions src/sds.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ extern const char *SDS_NOINIT;
#include <stdarg.h>
#include <stdint.h>
#include <stdatomic.h>
#include <stddef.h>

/* Constness:
*
Expand Down Expand Up @@ -82,7 +83,7 @@ typedef struct __attribute__((__packed__)) sdshdr32shared {
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
} sdshdr32shared;
} sdshdrshared;
struct __attribute__((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
Expand All @@ -98,8 +99,8 @@ struct __attribute__((__packed__)) sdshdr64 {
#define SDS_TYPE_32_SHARED 5
#define SDS_TYPE_MASK 7
#define SDS_TYPE_BITS 3
#define SDS_HDR_VAR(T, s) struct sdshdr##T *sh = (void *)((s) - (sizeof(struct sdshdr##T)));
#define SDS_HDR(T, s) ((struct sdshdr##T *)((s) - (sizeof(struct sdshdr##T))))
#define SDS_HDR_VAR(T, s) struct sdshdr##T *sh = (void *)((s) - (offsetof(struct sdshdr##T, buf)));
#define SDS_HDR(T, s) ((struct sdshdr##T *)((s) - (offsetof(struct sdshdr##T, buf))))
#define SDS_TYPE_5_LEN(f) ((unsigned char)(f) >> SDS_TYPE_BITS)

static inline unsigned char sdsType(const_sds s) {
Expand Down Expand Up @@ -164,8 +165,7 @@ static inline size_t sdsavail(const_sds s) {
return sh->alloc - sh->len;
}
case SDS_TYPE_32_SHARED: {
SDS_HDR_VAR(32shared, s);
return sh->alloc - sh->len;
return 0;
}
}
return 0;
Expand Down Expand Up @@ -289,8 +289,9 @@ sds sdsRemoveFreeSpace(sds s, int would_regrow);
sds sdsResize(sds s, size_t size, int would_regrow);
size_t sdsAllocSize(const_sds s);
void *sdsAllocPtr(const_sds s);
sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn);
void sdsRetain(sdshdr32shared *sh);
sdshdrshared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn);
void sdsRetain(sdshdrshared *sh);
int sdsReleaseShared(sdshdrshared *sh);

/* Returns the minimum required size to store an sds string of the given length
* and type. */
Expand Down
2 changes: 1 addition & 1 deletion src/t_zset.c
Original file line number Diff line number Diff line change
Expand Up @@ -4235,7 +4235,7 @@ void zrandmemberWithCountCommand(client *c, long l, int withscores) {

while (added < count) {
listpackEntry key;
double score;
double score = 0;
zsetTypeRandomElement(zsetobj, size, &key, withscores ? &score : NULL);

/* Try to add the object to the dictionary. If it already exists
Expand Down
4 changes: 2 additions & 2 deletions src/valkeymodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -1888,11 +1888,11 @@ VALKEYMODULE_API int (*ValkeyModule_UnregisterScriptingEngine)(ValkeyModuleCtx *
const char *engine_name) VALKEYMODULE_ATTR;

VALKEYMODULE_API ValkeyModuleScriptingEngineExecutionState (*ValkeyModule_GetFunctionExecutionState)(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) VALKEYMODULE_ATTR;
VALKEYMODULE_API ValkeyModuleSharedSDS *(*ValkeyModule_CreateSharedSDS)(size_t len,
VALKEYMODULE_API ValkeyModuleSharedSDS *(*ValkeyModule_CreateSharedSDS)(ValkeyModuleCtx *ctx, size_t len,
ValkeyModuleSharedSDSAllocFunc allocfn,
ValkeyModuleSharedSDSFreeCBFunc freecbfn) VALKEYMODULE_ATTR;
VALKEYMODULE_API char *(*ValkeyModule_SharedSDSPtrLen)(ValkeyModuleSharedSDS *shared_sds, size_t *len) VALKEYMODULE_ATTR;
VALKEYMODULE_API void (*ValkeyModule_ReleaseSharedSDS)(ValkeyModuleSharedSDS *shared_sds) VALKEYMODULE_ATTR;
VALKEYMODULE_API void (*ValkeyModule_ReleaseSharedSDS)(ValkeyModuleCtx *ctx, ValkeyModuleSharedSDS *shared_sds) VALKEYMODULE_ATTR;

#define ValkeyModule_IsAOFClient(id) ((id) == UINT64_MAX)

Expand Down
70 changes: 56 additions & 14 deletions tests/modules/hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,52 @@
#include <strings.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>

#define UNUSED(V) ((void) V)

int shared_sds_cnt = 0;
ValkeyModuleSharedSDS *shared_sds_arr[4];

void *shared_sds_alloc(size_t len, size_t *alloc) {
*alloc = 2 * len;
return malloc(*alloc * sizeof(char));
}

void shared_sds_free_cb(void *ptr, size_t size) {
UNUSED(size);
UNUSED(ptr);
}

void check_hash_get_shared_sds(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int result) {
if (result == 0) return;
ValkeyModuleKey *key = ValkeyModule_OpenKey(ctx, argv[1], VALKEYMODULE_READ);
for (int i = 0; i < shared_sds_cnt; ++i) {
ValkeyModuleSharedSDS *get_value = NULL;
ValkeyModule_HashGet(key, VALKEYMODULE_HASH_SHAREBLE_VALUES,
argv[i*2 + 3], &get_value, NULL);
if (get_value) {
ValkeyModule_Assert(shared_sds_arr[i] == get_value);
}
}
}

/* If a string is ":deleted:", the special value for deleted hash fields is
* returned; otherwise the input string is returned. */
static ValkeyModuleString *value_or_delete(ValkeyModuleString *s) {
if (!strcasecmp(ValkeyModule_StringPtrLen(s, NULL), ":delete:"))
static void *value_or_delete(ValkeyModuleCtx *ctx, ValkeyModuleString *s, int flags) {
size_t str_len;
const char *str = ValkeyModule_StringPtrLen(s, &str_len);
if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) {
ValkeyModuleSharedSDS *shared_sds = ValkeyModule_CreateSharedSDS(ctx, str_len, shared_sds_alloc, shared_sds_free_cb);
size_t sds_len;
char *sds_str = ValkeyModule_SharedSDSPtrLen(shared_sds, &sds_len);
ValkeyModule_Assert(sds_len == str_len);
memcpy(sds_str, str, str_len);
shared_sds_arr[shared_sds_cnt++] = shared_sds;
return shared_sds;
}
shared_sds_arr[shared_sds_cnt++] = NULL;
if (!strcasecmp(str, ":delete:"))
return VALKEYMODULE_HASH_DELETE;
else
return s;
Expand All @@ -22,6 +63,7 @@ int hash_set(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
if (argc < 5 || argc % 2 == 0 || argc > 11)
return ValkeyModule_WrongArity(ctx);

shared_sds_cnt = 0;
ValkeyModule_AutoMemory(ctx);
ValkeyModuleKey *key = ValkeyModule_OpenKey(ctx, argv[1], VALKEYMODULE_WRITE);

Expand All @@ -33,6 +75,7 @@ int hash_set(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
case 'n': flags |= VALKEYMODULE_HASH_NX; break;
case 'x': flags |= VALKEYMODULE_HASH_XX; break;
case 'a': flags |= VALKEYMODULE_HASH_COUNT_ALL; break;
case 's': flags |= VALKEYMODULE_HASH_SHAREBLE_VALUES; break;
}
}

Expand All @@ -41,38 +84,37 @@ int hash_set(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
errno = 0;
if (argc == 5) {
result = ValkeyModule_HashSet(key, flags,
argv[3], value_or_delete(argv[4]),
argv[3], value_or_delete(ctx, argv[4], flags),
NULL);
} else if (argc == 7) {
result = ValkeyModule_HashSet(key, flags,
argv[3], value_or_delete(argv[4]),
argv[5], value_or_delete(argv[6]),
argv[3], value_or_delete(ctx, argv[4], flags),
argv[5], value_or_delete(ctx, argv[6], flags),
NULL);
} else if (argc == 9) {
result = ValkeyModule_HashSet(key, flags,
argv[3], value_or_delete(argv[4]),
argv[5], value_or_delete(argv[6]),
argv[7], value_or_delete(argv[8]),
argv[3], value_or_delete(ctx, argv[4], flags),
argv[5], value_or_delete(ctx, argv[6], flags),
argv[7], value_or_delete(ctx, argv[8], flags),
NULL);
} else if (argc == 11) {
result = ValkeyModule_HashSet(key, flags,
argv[3], value_or_delete(argv[4]),
argv[5], value_or_delete(argv[6]),
argv[7], value_or_delete(argv[8]),
argv[9], value_or_delete(argv[10]),
argv[3], value_or_delete(ctx, argv[4], flags),
argv[5], value_or_delete(ctx, argv[6], flags),
argv[7], value_or_delete(ctx, argv[8], flags),
argv[9], value_or_delete(ctx, argv[10], flags),
NULL);
} else {
return ValkeyModule_ReplyWithError(ctx, "ERR too many fields");
}

/* Check errno */
if (result == 0) {
if (errno == ENOTSUP)
return ValkeyModule_ReplyWithError(ctx, VALKEYMODULE_ERRORMSG_WRONGTYPE);
else
ValkeyModule_Assert(errno == ENOENT);
}

check_hash_get_shared_sds(ctx, argv, result);
return ValkeyModule_ReplyWithLongLong(ctx, result);
}

Expand Down
8 changes: 7 additions & 1 deletion tests/unit/moduleapi/hash.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ start_server {tags {"modules"}} {
assert_equal 0 [r hash.set k "xa" new stuff not inserted]
assert_equal 1 [r hash.set k "x" squirrel ofcourse]
assert_equal 1 [r hash.set k "" sushi :delete: none :delete:]
assert_equal 1 [r hash.set k "nas" carrot orange]
assert_equal 0 [r hash.set k "nas" carrot pink]
assert_equal 1 [r hash.set k "a" apple red]
assert_equal 1 [r hash.set k "xs" apple green]
assert_equal 1 [r hash.set k "sa" tomato red]
assert_equal 1 [r hash.set k "" tomato :delete:]
r hgetall k
} {squirrel ofcourse banana no what nothing something nice}
} {squirrel ofcourse banana no what nothing something nice carrot orange apple green}

test "Unload the module - hash" {
assert_equal {OK} [r module unload hash]
Expand Down

0 comments on commit 47a9487

Please sign in to comment.