Skip to content

Commit

Permalink
prepare for 0.1.1 release
Browse files Browse the repository at this point in the history
  • Loading branch information
stvchu committed Nov 26, 2008
1 parent 96c74c5 commit eb4948b
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 82 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
2008-11-25 Steve Chu <[email protected]>
* version 0.1.1 is released.

2008-11-25 Steve Chu <[email protected]>
* Bugfix: 'stats queue' did not return right queue name
* All replication code has been discarded and removed. Original replication
code is buggy, also nobody want to use replication on a message queue system.
* more graceful exit

2008-09-19 Steve Chu <[email protected]>
* version 0.1.0 is released.
Expand Down
119 changes: 70 additions & 49 deletions bdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ static int get_queue_db_handle(DB_TXN *txn, char *queue_name, size_t queue_name_
static int update_queue_length(DB_TXN *txn, char *queue_name, size_t queue_name_size, int64_t delta);
static void close_queue_db_list(void);

static void *bdb_chkpoint_thread __P((void *));
static void *bdb_memp_trickle_thread __P((void *));
static void *bdb_dl_detect_thread __P((void *));
static void bdb_event_callback __P((DB_ENV *, u_int32_t, void *));
static void bdb_err_callback(const DB_ENV *dbenv, const char *errpfx, const char *msg);
static void bdb_msg_callback(const DB_ENV *dbenv, const char *msg);

static pthread_t chk_ptid;
static pthread_t mtri_ptid;
static pthread_t dld_ptid;
Expand Down Expand Up @@ -76,10 +83,12 @@ void bdb_env_init(void){
}

/* set err&msg display */
envp->set_errfile(envp, stderr);
envp->set_errpfx(envp, PACKAGE);
envp->set_msgfile(envp, stderr);

/* env->set_errfile(env, stderr); */
/* env->set_msgfile(env, stderr); */
envp->set_errcall(envp, bdb_err_callback);
envp->set_msgcall(envp, bdb_msg_callback);

/* set BerkeleyDB verbose*/
if (settings.verbose > 1) {
if ((ret = envp->set_verbose(envp, DB_VERB_FILEOPS_ALL, 1)) != 0) {
Expand Down Expand Up @@ -113,8 +122,9 @@ void bdb_env_init(void){
envp->set_lk_max_objects(envp, 20000);
envp->set_tx_max(envp, 20000);



/* at least max active transactions */
envp->set_tx_max(envp, 10000);

/* set transaction log buffer */
envp->set_lg_bsize(envp, bdb_settings.txn_lg_bsize);

Expand Down Expand Up @@ -715,74 +725,101 @@ void start_dl_detect_thread(void){
}
}

void *bdb_chkpoint_thread(void *arg)
static void *bdb_chkpoint_thread(void *arg)
{
DB_ENV *dbenvp;
DB_ENV *dbenv;
int ret;
dbenvp = arg;
dbenv = arg;
if (settings.verbose > 1) {
dbenvp->errx(dbenvp, "checkpoint thread created: %lu, every %d seconds",
dbenv->errx(dbenv, "checkpoint thread created: %lu, every %d seconds",
(u_long)pthread_self(), bdb_settings.chkpoint_val);
}
for (;; sleep(bdb_settings.chkpoint_val)) {
if ((ret = dbenvp->txn_checkpoint(dbenvp, 0, 0, 0)) != 0) {
dbenvp->err(dbenvp, ret, "checkpoint thread");
}
if (settings.verbose > 1) {
dbenvp->errx(dbenvp, "checkpoint thread: done");
if ((ret = dbenv->txn_checkpoint(dbenv, 0, 0, 0)) != 0) {
dbenv->err(dbenv, ret, "checkpoint thread");
}
dbenv->errx(dbenv, "checkpoint thread: a txn_checkpoint is done");
}
return (NULL);
}

void *bdb_memp_trickle_thread(void *arg)
static void *bdb_memp_trickle_thread(void *arg)
{
DB_ENV *dbenvp;
DB_ENV *dbenv;
int ret, nwrotep;
dbenvp = arg;
dbenv = arg;
if (settings.verbose > 1) {
dbenvp->errx(dbenvp, "memp_trickle thread created: %lu, every %d seconds, %d%% pages should be clean.",
dbenv->errx(dbenv, "memp_trickle thread created: %lu, every %d seconds, %d%% pages should be clean.",
(u_long)pthread_self(), bdb_settings.memp_trickle_val,
bdb_settings.memp_trickle_percent);
}
for (;; sleep(bdb_settings.memp_trickle_val)) {
if ((ret = dbenvp->memp_trickle(dbenvp, bdb_settings.memp_trickle_percent, &nwrotep)) != 0) {
dbenvp->err(dbenvp, ret, "memp_trickle thread");
}
if (settings.verbose > 1) {
dbenvp->errx(dbenvp, "memp_trickle thread: done, writing %d dirty pages", nwrotep);
if ((ret = dbenv->memp_trickle(dbenv, bdb_settings.memp_trickle_percent, &nwrotep)) != 0) {
dbenv->err(dbenv, ret, "memp_trickle thread");
}
dbenv->errx(dbenv, "memp_trickle thread: writing %d dirty pages", nwrotep);
}
return (NULL);
}

void *bdb_dl_detect_thread(void *arg)
static void *bdb_dl_detect_thread(void *arg)
{
DB_ENV *dbenvp;
DB_ENV *dbenv;
struct timeval t;
dbenvp = arg;
dbenv = arg;
if (settings.verbose > 1) {
dbenvp->errx(dbenvp, "deadlock detecting thread created: %lu, every %d millisecond",
dbenv->errx(dbenv, "deadlock detecting thread created: %lu, every %d millisecond",
(u_long)pthread_self(), bdb_settings.dldetect_val);
}
while (!daemon_quit) {
t.tv_sec = 0;
t.tv_usec = bdb_settings.dldetect_val;
(void)dbenvp->lock_detect(dbenvp, 0, DB_LOCK_YOUNGEST, NULL);
(void)dbenv->lock_detect(dbenv, 0, DB_LOCK_YOUNGEST, NULL);
/* select is a more accurate sleep timer */
(void)select(0, NULL, NULL, NULL, &t);
}
return (NULL);
}

void bdb_event_callback(DB_ENV *envp, u_int32_t which, void *info)
static void bdb_event_callback(DB_ENV *env, u_int32_t which, void *info)
{
switch (which) {
case DB_EVENT_PANIC: /* FALLTHROUGH */
case DB_EVENT_WRITE_FAILED: /* FALLTHROUGH */
case DB_EVENT_PANIC:
env->errx(env, "evnet: DB_EVENT_PANIC, we got panic, recovery should be run.");
break;
case DB_EVENT_WRITE_FAILED:
env->errx(env, "event: DB_EVENT_WRITE_FAILED, I wrote to stable storage failed.");
break;
default:
envp->errx(envp, "ignoring event %d", which);
env->errx(env, "ignoring event %d", which);
}
}

static void bdb_err_callback(const DB_ENV *dbenv, const char *errpfx, const char *msg){
time_t curr_time = time(NULL);
char time_str[32];
strftime(time_str, 32, "%c", localtime(&curr_time));
fprintf(stderr, "[%s] [%s] \"%s\"\n", errpfx, time_str, msg);
}

static void bdb_msg_callback(const DB_ENV *dbenv, const char *msg){
time_t curr_time = time(NULL);
char time_str[32];
strftime(time_str, 32, "%c", localtime(&curr_time));
fprintf(stderr, "[%s] [%s] \"%s\"\n", PACKAGE, time_str, msg);
}

/* for atexit cleanup */
void bdb_chkpoint(void)
{
int ret = 0;
if (envp != NULL){
ret = envp->txn_checkpoint(envp, 0, 0, 0);
if (0 != ret){
fprintf(stderr, "envp->txn_checkpoint: %s\n", db_strerror(ret));
}else{
fprintf(stderr, "envp->txn_checkpoint: OK\n");
}
}
}

Expand All @@ -793,7 +830,6 @@ void bdb_db_close(void){
/* close the queue list db */
if (qlist_dbp != NULL) {
close_queue_db_list();

ret = qlist_dbp->close(qlist_dbp, 0);
if (0 != ret){
fprintf(stderr, "qlist_dbp->close: %s\n", db_strerror(ret));
Expand All @@ -816,19 +852,4 @@ void bdb_env_close(void){
fprintf(stderr, "envp->close: OK\n");
}
}
}

/* for atexit cleanup */
void bdb_chkpoint(void)
{
int ret = 0;
if (envp != NULL){
ret = envp->txn_checkpoint(envp, 0, 0, 0);
if (0 != ret){
fprintf(stderr, "envp->txn_checkpoint: %s\n", db_strerror(ret));
}else{
fprintf(stderr, "envp->txn_checkpoint: OK\n");
}
}
}

}
20 changes: 10 additions & 10 deletions configure
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.61 for memcacheq 0.1.0.
# Generated by GNU Autoconf 2.61 for memcacheq 0.1.1.
#
# Report bugs to <[email protected]>.
#
Expand Down Expand Up @@ -574,8 +574,8 @@ SHELL=${CONFIG_SHELL-/bin/sh}
# Identity of this package.
PACKAGE_NAME='memcacheq'
PACKAGE_TARNAME='memcacheq'
PACKAGE_VERSION='0.1.0'
PACKAGE_STRING='memcacheq 0.1.0'
PACKAGE_VERSION='0.1.1'
PACKAGE_STRING='memcacheq 0.1.1'
PACKAGE_BUGREPORT='[email protected]'

# Factoring default headers for most tests.
Expand Down Expand Up @@ -1206,7 +1206,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures memcacheq 0.1.0 to adapt to many kinds of systems.
\`configure' configures memcacheq 0.1.1 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
Expand Down Expand Up @@ -1272,7 +1272,7 @@ fi

if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of memcacheq 0.1.0:";;
short | recursive ) echo "Configuration of memcacheq 0.1.1:";;
esac
cat <<\_ACEOF
Expand Down Expand Up @@ -1363,7 +1363,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
memcacheq configure 0.1.0
memcacheq configure 0.1.1
generated by GNU Autoconf 2.61
Copyright (C) 1992, 1993, 1994, 1995, 1996, 1998, 1999, 2000, 2001,
Expand All @@ -1377,7 +1377,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by memcacheq $as_me 0.1.0, which was
It was created by memcacheq $as_me 0.1.1, which was
generated by GNU Autoconf 2.61. Invocation command line was
$ $0 $@
Expand Down Expand Up @@ -2067,7 +2067,7 @@ fi
# Define the identity of the package.
PACKAGE='memcacheq'
VERSION='0.1.0'
VERSION='0.1.1'
cat >>confdefs.h <<_ACEOF
Expand Down Expand Up @@ -5928,7 +5928,7 @@ exec 6>&1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by memcacheq $as_me 0.1.0, which was
This file was extended by memcacheq $as_me 0.1.1, which was
generated by GNU Autoconf 2.61. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
Expand Down Expand Up @@ -5981,7 +5981,7 @@ Report bugs to <[email protected]>."
_ACEOF
cat >>$CONFIG_STATUS <<_ACEOF
ac_cs_version="\\
memcacheq config.status 0.1.0
memcacheq config.status 0.1.1
configured by $0, generated by GNU Autoconf 2.61,
with options \\"`echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`\\"
Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
AC_PREREQ(2.61)
AC_INIT([memcacheq], [0.1.0], [[email protected]])
AC_INIT([memcacheq], [0.1.1], [[email protected]])
AM_INIT_AUTOMAKE([-Wall -Werror foreign])
AC_PROG_CC
AM_PROG_CC_C_O
Expand Down
38 changes: 20 additions & 18 deletions memcacheq.c
Original file line number Diff line number Diff line change
Expand Up @@ -2093,17 +2093,26 @@ static void remove_pidfile(const char *pid_file) {
/* for safely exit, make sure to do checkpoint*/
static void sig_handler(const int sig)
{
int ret;
if (sig != SIGTERM && sig != SIGQUIT && sig != SIGINT) {
return;
}
if (daemon_quit == 1){
return;
}
daemon_quit = 1;
fprintf(stderr, "Signal %d handled, memcacahedb is now exit..\n", sig);
fprintf(stderr, "Signal(%d) received, try to exit daemon gracefully..\n", sig);

/* exit event loop first */
fprintf(stderr, "exit event base...");
ret = event_base_loopexit(main_base, 0);
if (ret == 0)
fprintf(stderr, "done.\n");
else
fprintf(stderr, "error\n");

/* make sure deadlock detect loop is quit*/
sleep(2);

/* then we exit to call axexit */
exit(EXIT_SUCCESS);
}

int main (int argc, char **argv) {
Expand Down Expand Up @@ -2372,20 +2381,6 @@ int main (int argc, char **argv) {
}
}

/* register atexit callback function */
if (0 != atexit(bdb_env_close)) {
fprintf(stderr, "can not register close_env");
exit(EXIT_FAILURE);
}
if (0 != atexit(bdb_db_close)) {
fprintf(stderr, "can not register close_db");
exit(EXIT_FAILURE);
}
if (0 != atexit(bdb_chkpoint)) {
fprintf(stderr, "can not register db_checkpoint");
exit(EXIT_FAILURE);
}

/* here we init bdb env and open db */
bdb_env_init();
bdb_qlist_db_open();
Expand All @@ -2397,6 +2392,13 @@ int main (int argc, char **argv) {

/* enter the event loop */
event_base_loop(main_base, 0);

/* cleanup bdb staff */
fprintf(stderr, "try to clean up bdb resource...\n");
bdb_chkpoint();
bdb_db_close();
bdb_env_close();

/* remove the PID file if we're a daemon */
if (daemonize)
remove_pidfile(pid_file);
Expand Down
4 changes: 0 additions & 4 deletions memcacheq.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,6 @@ int bdb_put(char *key, size_t nkey, item *it);
void start_chkpoint_thread(void);
void start_memp_trickle_thread(void);
void start_dl_detect_thread(void);
void *bdb_chkpoint_thread __P((void *));
void *bdb_memp_trickle_thread __P((void *));
void *bdb_dl_detect_thread __P((void *));
void bdb_event_callback __P((DB_ENV *, u_int32_t, void *));
void bdb_db_close(void);
void bdb_env_close(void);
void bdb_chkpoint(void);
Expand Down
18 changes: 18 additions & 0 deletions test.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php
$memcache;
$memcache = new Memcache;
$memcache->connect('localhost', 22201) or die ("Could not connect");

for ($i=0; $i < 100000; $i++) {
$msg = "aaaaaaaaaaaaaaa";
while(!$memcache->set('queue_001', $msg, false, 0)){
$memcache->close();
$memcache = new Memcache;
$memcache->connect('localhost', 22201) or die ("Could not connect");
// any logging here?
echo "reconnect...\n";
}
}

$memcache->close();
?>

0 comments on commit eb4948b

Please sign in to comment.