Skip to content

Commit

Permalink
Re-think DB selection
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Sep 4, 2024
1 parent d4a21c7 commit f9f5a39
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 26 deletions.
3 changes: 1 addition & 2 deletions include/redisx-priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ typedef struct {
typedef struct {
uint32_t addr; ///< The 32-bit inet address
int port; ///< port number (usually 6379)
int dbIndex; ///< the zero-based database index
char *password; ///< Redis password (if any)

RedisClient *clients;
Expand All @@ -83,7 +84,6 @@ typedef struct {
} RedisPrivate;



// in redisx-sub.c ------------------------>
void rConfigLock(Redis *redis);
void rConfigUnlock(Redis *redis);
Expand All @@ -92,7 +92,6 @@ int rStartSubscriptionListenerAsync(Redis *redis);

// in redisx-sub.c ------------------------>
void *RedisSubscriptionListener(void *pRedis);
void rEndSubscriptionAsync(Redis *redis);

// in redisx-net.c ------------------------>
void rInitClient(RedisClient *cl, enum redisx_channel channel);
Expand Down
6 changes: 2 additions & 4 deletions include/redisx.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ enum redisx_channel {
SUBSCRIPTION_CHANNEL ///< \hideinitializer Redis channel number for PUB/SUB messages
};

#define REDIS_CHANNELS (SUBSCRIPTION_CHANNEL + 1) ///< \hideinitializer The number of channels a Redis instance has.
#define REDISX_CHANNELS (SUBSCRIPTION_CHANNEL + 1) ///< \hideinitializer The number of channels a Redis instance has.


/**
Expand Down Expand Up @@ -235,8 +235,7 @@ boolean redisxIsConnected(Redis *redis);
boolean redisxHasPipeline(Redis *redis);

RedisClient *redisxGetClient(Redis *redis, enum redisx_channel channel);
int redisxSelectDB(RedisClient *cl, int idx);
int redisxResetClient(RedisClient *cl);
int redisxSelectDB(Redis *redis, int idx, boolean confirm);

void redisxAddConnectHook(Redis *redis, void (*setupCall)(Redis *));
void redisxRemoveConnectHook(Redis *redis, void (*setupCall)(Redis *));
Expand Down Expand Up @@ -295,7 +294,6 @@ RESP *redisxReadReplyAsync(RedisClient *cl);
int redisxIgnoreReplyAsync(RedisClient *cl);
int redisxSkipReplyAsync(RedisClient *cl);
int redisxPublishAsync(Redis *redis, const char *channel, const char *data, int length);
int redisxSelectDBAsync(RedisClient *cl, int idx, boolean confirm);

// Error generation with stderr message...
int redisxError(const char *func, int errorCode);
Expand Down
5 changes: 2 additions & 3 deletions src/redisx-net.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ static void rDisconnectAsync(Redis *redis) {
p->isPipelineListenerEnabled = FALSE;

// Gracefully end subscriptions and close subscription client
rEndSubscriptionAsync(redis);
rCloseClient(redis->pipeline);
rCloseClient(redis->interactive);

Expand Down Expand Up @@ -366,7 +365,7 @@ void rShutdownLinkAsync(Redis *redis) {
int i;

// NOTE: Don't use client locks, as they may deadlock when trying to shut down...
for(i=0; i<REDIS_CHANNELS; i++) rDisconnectClientAsync(&p->clients[i]);
for(i=0; i<REDISX_CHANNELS; i++) rDisconnectClientAsync(&p->clients[i]);
}

/// \cond PRIVATE
Expand Down Expand Up @@ -650,7 +649,7 @@ RedisClient *redisxGetClient(Redis *redis, enum redisx_channel channel) {
if(redis == NULL) return NULL;

p = (RedisPrivate *) redis->priv;
if(channel < 0 || channel >= REDIS_CHANNELS) return NULL;
if(channel < 0 || channel >= REDISX_CHANNELS) return NULL;
return &p->clients[channel];
}

Expand Down
68 changes: 51 additions & 17 deletions src/redisx.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static pthread_attr_t threadConfig;

/// \cond PRIVATE


/**
* Waits to get exlusive access to configuring the properties of a Redis instance.
*
Expand Down Expand Up @@ -203,7 +204,7 @@ Redis *redisxInit(const char *server) {
p->clients = (RedisClient *) calloc(3, sizeof(RedisClient));

// Initialize the store access mutexes for each client channel.
for(i=REDIS_CHANNELS; --i >= 0; ) rInitClient(&p->clients[i], i);
for(i=REDISX_CHANNELS; --i >= 0; ) rInitClient(&p->clients[i], i);

redis = (Redis *) calloc(1, sizeof(Redis));
redis->priv = p;
Expand All @@ -212,7 +213,7 @@ Redis *redisxInit(const char *server) {
redis->subscription = &p->clients[SUBSCRIPTION_CHANNEL];
redis->id = xStringCopyOf(ipAddress);

for(i=REDIS_CHANNELS; --i >= 0; ) {
for(i=REDISX_CHANNELS; --i >= 0; ) {
ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv;
cp->redis = redis;
}
Expand Down Expand Up @@ -276,7 +277,7 @@ void redisxDestroy(Redis *redis) {

if(redisxIsConnected(redis)) redisxDisconnect(redis);

for(i=REDIS_CHANNELS; --i >= 0; ) {
for(i=REDISX_CHANNELS; --i >= 0; ) {
ClientPrivate *cp = (ClientPrivate *) p->clients[i].priv;
pthread_mutex_destroy(&cp->readLock);
pthread_mutex_destroy(&cp->writeLock);
Expand Down Expand Up @@ -432,8 +433,8 @@ int redisxPing(Redis *redis, const char *message) {
* @sa redisxSelectDB()
* @sa redisxLockEnabled()
*/
int redisxSelectDBAsync(RedisClient *cl, int idx, boolean confirm) {
static const char *funcName = "redisxSelectDBAsync()";
int redisxSelectClientDBAsync(RedisClient *cl, int idx, boolean confirm) {
static const char *funcName = "redisxSelectClientDBAsync()";

char sval[20];
int status;
Expand All @@ -460,34 +461,67 @@ int redisxSelectDBAsync(RedisClient *cl, int idx, boolean confirm) {
return X_SUCCESS;
}

static void rAffirmDB(Redis *redis) {
const RedisPrivate *p = (RedisPrivate *) redis->priv;
redisxSelectDB(redis, p->dbIndex, TRUE);
}

/**
* Siwtches to another database. See the Redis `SELECT` command.
* Switches to another database index on the Redis server. Note that you cannot change the database on an active
* PUB/SUB channel, hence the call will return X_INCOMPLETE if attempted. You should instead switch DB when there
* are no active subscriptions.
*
* @param cl the redis client
* @param redis Pointer to a Redis instance.
* @param idx zero-based database index
* @return X_SUCCESS (0) if successful, or else an error code (&lt;0) from redisx.h / xchange.h.
* @param confirm Whether to wait for confirmation from Redis, and check the response.
* @return X_SUCCESS (0) if successful, or
* X_NULL if the redis argument is NULL,
* X_NO_INIT if the redis instance is not connected,
* X_INCOMPLETE if there is an active subscription channel that cannot be switched or
* one of the channels could not confirm the switch, or
* else another error code (&lt;0) from redisx.h / xchange.h.
*
* @sa redisxSelectDBAsync()
* @sa redisxSelectDB()
* @sa redisxLockEnabled()
*/
int redisxSelectDB(RedisClient *cl, int idx) {
int redisxSelectDB(Redis *redis, int idx, boolean confirm) {
static const char *funcName = "redisxSelectDB()";

const ClientPrivate *cp;
RedisPrivate *p;
enum redisx_channel c;
int status = X_SUCCESS;

if(cl == NULL) return redisxError(funcName, X_NULL);
if(!redis) return redisxError(funcName, X_NULL);
if(!redisxIsConnected(redis)) return redisxError(funcName, X_NO_INIT);

cp = (ClientPrivate *) cl->priv;
p = (RedisPrivate *) redis->priv;

status = redisxLockEnabled(cl);
if(status) return redisxError(funcName, status);
if(p->dbIndex == idx) return X_SUCCESS;

status = redisxSelectDBAsync(cl, idx, cp->idx == PIPELINE_CHANNEL);
redisxUnlockClient(cl);
redisxAddConnectHook(redis, rAffirmDB);
p->dbIndex = idx;

for(c = 0; c < REDISX_CHANNELS; c++) {
RedisClient *cl = redisxGetClient(redis, c);
int s = redisxLockEnabled(cl);

// We can't switch unconnected clients or the existing subscription client
if(s == REDIS_INVALID_CHANNEL || c == SUBSCRIPTION_CHANNEL) {
if(!s) status = X_INCOMPLETE;
redisxUnlockClient(cl);
continue;
}

s = redisxSelectClientDBAsync(cl, idx, confirm && c != PIPELINE_CHANNEL);
redisxUnlockClient(cl);

if(s) status = X_INCOMPLETE;
}

return status;
}


/**
* Sends a `RESET` request to the specified Redis client. The server will perform a reset as if the
* client disconnected and reconnected again.
Expand Down

0 comments on commit f9f5a39

Please sign in to comment.