Skip to content

Commit

Permalink
BGSAVE work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
antirez committed Jan 5, 2011
1 parent 5133510 commit 249ad25
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 10 deletions.
47 changes: 47 additions & 0 deletions src/diskstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,50 @@ void dsFlushDb(int dbid) {
}
}
}

int dsRdbSave(char *filename) {
char tmpfile[256];
int j, i;
time_t now = time(NULL);

snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
return REDIS_ERR;
}
if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;

/* Scan all diskstore dirs looking for keys */
for (j = 0; j < 256; j++) {
for (i = 0; i < 256; i++) {
snprintf(buf,1024,"%s/%02x/%02x",server.ds_path,j,i);

/* Write the SELECT DB opcode */
if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
if (rdbSaveLen(fp,j) == -1) goto werr;
}
}

/* Make sure data will not remain on the OS's output buffers */
fflush(fp);
fsync(fileno(fp));
fclose(fp);

/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
return REDIS_OK;

werr:
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
}
22 changes: 18 additions & 4 deletions src/dscache.c
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ int processActiveIOJobs(int max) {

#if 0
/* If there are new jobs we need to signal the thread to
* process the next one. */
* process the next one. FIXME: drop this if useless. */
redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d",
listLength(server.io_newjobs),
listLength(server.io_processing));
Expand Down Expand Up @@ -576,7 +576,21 @@ void queueIOJob(iojob *j) {
spawnIOThread();
}

void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
/* Consume all the IO scheduled operations, and all the thread IO jobs
* so that eventually the state of diskstore is a point-in-time snapshot.
*
* This is useful when we need to BGSAVE with diskstore enabled. */
void cacheForcePointInTime(void) {
redisLog(REDIS_NOTICE,"Diskstore: synching on disk to reach point-in-time state.");
while (listLength(server.cache_io_queue) != 0) {
cacheScheduleIOPushJobs(REDIS_IO_ASAP);
processActiveIOJobs(1);
}
waitEmptyIOJobsQueue();
processAllPendingIOJobs();
}

void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
iojob *j;

j = zmalloc(sizeof(*j));
Expand Down Expand Up @@ -762,7 +776,7 @@ int cacheScheduleIOPushJobs(int flags) {
op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr);

if (op->type == REDIS_IO_LOAD) {
dsCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL);
cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL);
} else {
/* Lookup the key, in order to put the current value in the IO
* Job. Otherwise if the key does not exists we schedule a disk
Expand All @@ -775,7 +789,7 @@ int cacheScheduleIOPushJobs(int flags) {
* the key on disk. */
val = NULL;
}
dsCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val);
cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val);
}
/* Mark the operation as in progress. */
cacheScheduleIODelFlag(op->db,op->key,op->type);
Expand Down
19 changes: 13 additions & 6 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,10 @@ int rdbSave(char *filename) {
int j;
time_t now = time(NULL);

/* FIXME: implement .rdb save for disk store properly */
redisAssert(server.ds_enabled == 0);
if (server.ds_enabled) {
cacheForcePointInTime();
return dsRdbSave(filename);
}

snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
Expand Down Expand Up @@ -495,17 +497,22 @@ int rdbSaveBackground(char *filename) {
pid_t childpid;

if (server.bgsavechildpid != -1) return REDIS_ERR;
redisAssert(server.ds_enabled == 0);

server.dirty_before_bgsave = server.dirty;

if ((childpid = fork()) == 0) {
int retval;

/* Child */
if (server.ipfd > 0) close(server.ipfd);
if (server.sofd > 0) close(server.sofd);
if (rdbSave(filename) == REDIS_OK) {
_exit(0);
if (server.ds_enabled) {
cacheForcePointInTime();
dsRdbSave(filename);
} else {
_exit(1);
rdbSave(filename);
}
_exit((retval == REDIS_OK) ? 0 : 1);
} else {
/* Parent */
if (childpid == -1) {
Expand Down
1 change: 1 addition & 0 deletions src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,7 @@ void cacheCron(void);
int cacheKeyMayExist(redisDb *db, robj *key);
void cacheSetKeyMayExist(redisDb *db, robj *key);
void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
void cacheForcePointInTime(void);

/* Set data type */
robj *setTypeCreate(robj *value);
Expand Down
18 changes: 18 additions & 0 deletions utils/whatisdoing.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# This script is from http://poormansprofiler.org/

#!/bin/bash
nsamples=1
sleeptime=0
pid=$(pidof redis-server)

for x in $(seq 1 $nsamples)
do
gdb -ex "set pagination 0" -ex "thread apply all bt" -batch -p $pid
sleep $sleeptime
done | \
awk '
BEGIN { s = ""; }
/Thread/ { print s; s = ""; }
/^\#/ { if (s != "" ) { s = s "," $4} else { s = $4 } }
END { print s }' | \
sort | uniq -c | sort -r -n -k 1,1

0 comments on commit 249ad25

Please sign in to comment.