Skip to content
This repository has been archived by the owner on Oct 28, 2023. It is now read-only.

Commit

Permalink
add cursor for search #82
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Sep 20, 2014
1 parent 3151dbc commit 7a6771e
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 166 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ LIB_OBJS = \
./tree/compress/compress.o \
./tree/compress/snappy.o \
./tree/tree-func.o \
./tree/cursor.o \
./tree/flusher.o \
./tree/msgpack.o \
./tree/block.o \
./tree/layout.o \
./tree/block.o \
./tree/hdr.o \
./tree/node.o \
./tree/tree.o \
Expand Down
210 changes: 71 additions & 139 deletions tree/cursor.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

#include "cache.h"
#include "leaf.h"
#include "msgbuf.h"
#include "compare-func.h"
#include "lmb.h"
#include "cursor.h"

/**
Expand All @@ -24,28 +23,25 @@
*/
int _search_in_which_child(struct search *so, struct node *node)
{
int lo;
int hi;
int mi;
int c;
int lo, hi, mi, cmp;
int children;
int childnum = 0;

/* i am leaf */
if (node->height == 0)
return 0;

children = node->u.n.n_children;
children = node->n_children;
nassert(children >= 2);

lo = 0;
hi = children - 1;
while (lo < hi) {
/* mi integer overflow never happens */
mi = (lo + hi) / 2;
c = so->direction_compare_func(so, &node->u.n.pivots[mi]);
if (((so->direction == SEARCH_FORWARD) && c) ||
((so->direction == SEARCH_BACKWARD) && !c))
cmp = so->direction_compare_func(so, &node->pivots[mi]);
if (((so->direction == SEARCH_FORWARD) && cmp) ||
((so->direction == SEARCH_BACKWARD) && !cmp))
hi = mi;
else
lo = mi + 1;
Expand All @@ -56,19 +52,18 @@ int _search_in_which_child(struct search *so, struct node *node)
* detecting whether we should move to the prev/next pivot
* make a new root-to-leaf path
*/
int cmp;
struct msg *pivot;

switch (so->direction) {
case SEARCH_FORWARD:
pivot = &node->u.n.pivots[childnum];
pivot = &node->pivots[childnum];
cmp = so->pivotbound_compare_func(so, pivot);

while (childnum < (children - 1) && cmp >= 0)
childnum++;
break;
case SEARCH_BACKWARD:
pivot = &node->u.n.pivots[childnum - 1];
pivot = &node->pivots[childnum - 1];
cmp = so->pivotbound_compare_func(so, pivot);

while (childnum > 0 && cmp <= 0)
Expand All @@ -90,19 +85,19 @@ void _save_pivot_bound(struct search *so, struct node *n, int child_searched)

int p = (so->direction == SEARCH_FORWARD) ?
child_searched : child_searched - 1;
if (p >= 0 && p < (int)(n->u.n.n_children - 1)) {
if (p >= 0 && p < (int)(n->n_children - 1)) {
if (so->pivot_bound)
msgfree(so->pivot_bound);
so->pivot_bound = msgdup(&n->u.n.pivots[p]);
so->pivot_bound = msgdup(&n->pivots[p]);
}
}

void ancestors_append(struct cursor *cur, struct fifo *fifo)
void ancestors_append(struct cursor *cur, struct nmb *nmb)
{
struct ancestors *next_ance;

next_ance = xcalloc(1, sizeof(*next_ance));
next_ance->v = fifo;
next_ance->v = nmb;
next_ance->next = cur->ances;
cur->ances = next_ance;
cur->ances_size++;
Expand All @@ -116,144 +111,81 @@ void ancestors_append(struct cursor *cur, struct fifo *fifo)
* d) repeatable: cur_id >= id, and id not in live snapshot list
*
*/
int _get_visible_value(struct cursor *cur, struct msgbuf_iter *iter)
int _cursor_get_values_from_leafentry(struct cursor *cur, void *le)
{
TXNID id;
TXNID curid = TXNID_NONE;
(void)cur;
(void)le;
nassert(0);

int visi = 0;
int islive;

struct txnmgr *tmgr = NULL;
struct txnid_snapshot *lives;

if (cur->txn) {
tmgr = cur->txn->logger->txnmgr;
curid = cur->txn->root_parent_txnid;
}
while (msgbuf_internal_iter_reverse(iter)) {
/* in non-transaction */
if (!cur->txn) {
visi = 1;
goto GOT_VALUE;
}

/* in transaction */
switch (cur->iso_type) {
case TXN_ISO_SERIALIZABLE:
case TXN_ISO_READ_UNCOMMITTED:
visi = 1;
goto GOT_VALUE;
case TXN_ISO_READ_COMMITTED:
id = iter->xidpair.parent_xid;
lives = tmgr->live_root_txnids;
islive = txnmgr_txn_islive(lives, id);
if (!islive && (curid >= id)) {
visi = 1;
goto GOT_VALUE;
}
break;
case TXN_ISO_REPEATABLE:
id = iter->xidpair.parent_xid;
lives = cur->txn->txnid_clone;
islive = txnmgr_txn_islive(lives, id);
if (!islive && (curid >= id)) {
visi = 1;
goto GOT_VALUE;
}
break;
}
}

GOT_VALUE:
if (visi) {
cur->valid = 1;
cur->msgtype = iter->type;
cur->key = iter->key;
cur->val = iter->val;
}

return visi;
return -1;
}

int _search_leaf(struct cursor *cur, struct search *so, struct node *leaf)
{
int found;
int ret = CURSOR_EOF;
struct msgbuf_iter iter;
struct lmb *lmb;
struct mb_iter iter;
struct leafentry *le;
struct pma_coord coord;

/* first to apply all msgs to leaf msgbuf */
/* 1) apply all msgs to leaf msgbuf */
leaf_apply_ancestors(leaf, cur->ances);

/* init iter */
iter.key = cur->key;
msgbuf_iter_init(&iter, leaf->u.l.buffer);
/* 2) init leaf iterator, TODO: which msgbuf to read */
lmb = leaf->parts[0].ptr.u.leaf->buffer;
mb_iter_init(&iter, lmb->pma);

/* 3) do search */
switch (so->gap) {
case GAP_ZERO:
nassert(so->direction == SEARCH_FORWARD);
msgbuf_iter_seek(&iter, so->key);
if (msgbuf_iter_valid(&iter)) {
if (_get_visible_value(cur, &iter) &&
iter.type != MSG_DELETE) {
ret = CURSOR_CONTINUE;
goto RET;
found = lmb_find_zero(lmb, &cur->key, &le, &coord);
if (found) {
mb_iter_reset(&iter, &coord);
if (mb_iter_valid(&iter)) {
_cursor_get_values_from_leafentry(cur, iter.base);
}
} else {
cur->valid = 0;
ret = CURSOR_EOF;
}
ret = CURSOR_EOF;
break;
case GAP_POSI:
nassert(so->direction == SEARCH_FORWARD);
if (!so->key)
msgbuf_iter_seektofirst(&iter);
else
msgbuf_iter_seek(&iter, so->key);
while (msgbuf_iter_valid(&iter)) {
msgbuf_iter_next(&iter);
if (msgbuf_iter_valid(&iter)) {
if (_get_visible_value(cur, &iter) &&
iter.type != MSG_DELETE) {
ret = CURSOR_CONTINUE;
goto RET;
found = lmb_find_plus(lmb, &cur->key, &le, &coord);
if (found) {
mb_iter_reset(&iter, &coord);
POSI_RETRY:
if (mb_iter_valid(&iter)) {
int got = _cursor_get_values_from_leafentry(cur, iter.base);
if (!got) {
mb_iter_next(&iter);
goto POSI_RETRY;
} else {
ret = CURSOR_EOF;
}
} else {
goto POSI_RET;
ret = CURSOR_CONTINUE;
}
}
POSI_RET:
cur->valid = 0;
ret = CURSOR_EOF;
break;
case GAP_NEGA:
nassert(so->direction == SEARCH_BACKWARD);
if (!so->key)
msgbuf_iter_seektolast(&iter);
else
msgbuf_iter_seek(&iter, so->key);
while (msgbuf_iter_valid(&iter)) {
msgbuf_iter_prev(&iter);
if (msgbuf_iter_valid(&iter)) {
if (_get_visible_value(cur, &iter) &&
iter.type != MSG_DELETE) {
ret = CURSOR_CONTINUE;
goto RET;
found = lmb_find_minus(lmb, &cur->key, &le, &coord);
if (found) {
mb_iter_reset(&iter, &coord);
NEGA_RETRY:
if (mb_iter_valid(&iter)) {
int got = _cursor_get_values_from_leafentry(cur, iter.base);
if (!got) {
mb_iter_prev(&iter);
goto NEGA_RETRY;
} else {
ret = CURSOR_EOF;
}
} else {
goto NEGA_RET;
ret = CURSOR_CONTINUE;
}
}
NEGA_RET:
cur->valid = 0;
ret = CURSOR_EOF;
break;
default:
cur->valid = 0;
ret = CURSOR_EOF;
break;
}

RET:
return ret;
}

Expand All @@ -266,15 +198,15 @@ int _search_child(struct cursor *cur,
int childnum)
{
int ret;
int child_to_search;
NID child_nid;
int child_to_search;
struct node *child;

nassert(n->height > 0);
ancestors_append(cur, n->u.n.parts[childnum].buffer);
ancestors_append(cur, n->parts[childnum].ptr.u.nonleaf->buffer);

child_nid = n->u.n.parts[childnum].child_nid;
if (cache_get_and_pin(cur->tree->cf, child_nid, &child, L_READ) < 0) {
child_nid = n->parts[childnum].child_nid;
if (!cache_get_and_pin(cur->tree->cf, child_nid, (void**)&child, L_READ)) {
__ERROR("cache get node error, nid [%" PRIu64 "]", child_nid);

return NESS_ERR;
Expand All @@ -284,7 +216,7 @@ int _search_child(struct cursor *cur,
ret = _search_node(cur, so, child, child_to_search);

/* unpin */
cache_unpin(cur->tree->cf, child);
cache_unpin(cur->tree->cf, child->cpair, make_cpair_attr(child));

return ret;
}
Expand All @@ -304,7 +236,7 @@ int _search_node(struct cursor *cur,
_save_pivot_bound(so, n, child_to_search);
switch (so->direction) {
case SEARCH_FORWARD:
children = (int)(n->u.n.n_children - 1);
children = (int)(n->n_children - 1);
if (child_to_search < children)
r = CURSOR_TRY_AGAIN;
break;
Expand Down Expand Up @@ -351,11 +283,10 @@ void _tree_search(struct cursor * cur, struct search * so)

t = cur->tree;

try_again:
TRY_AGAIN:
root_nid = t->hdr->root_nid;
if (cache_get_and_pin(t->cf, root_nid, &root, L_READ) < 0) {
__ERROR("cache get root node error, nid [%" PRIu64 "]",
root_nid);
if (!cache_get_and_pin(t->cf, root_nid, (void**)&root, L_READ)) {
__ERROR("cache get root node error, nid [%" PRIu64 "]", root_nid);

return;
}
Expand All @@ -364,13 +295,14 @@ void _tree_search(struct cursor * cur, struct search * so)
r = _search_node(cur, so, root, child_to_search);

/* unpin */
cache_unpin(t->cf, root);
cache_unpin(t->cf, root->cpair, make_cpair_attr(root));

switch (r) {
case CURSOR_CONTINUE:
case CURSOR_CONTINUE: /* got the end of leaf */
goto TRY_AGAIN;
break;
case CURSOR_TRY_AGAIN:
goto try_again;
case CURSOR_TRY_AGAIN: /* got the end of node */
goto TRY_AGAIN;
break;
case CURSOR_EOF:
break;
Expand Down
8 changes: 4 additions & 4 deletions tree/cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ struct cursor {
MSN msn;
int valid;
msgtype_t msgtype;
struct msg key; /* return key */
struct msg val; /* return val */
struct tree *tree; /* index */
void *extra; /* extra(no-use) */
struct msg key; /* return key */
struct msg val; /* return val */
struct tree *tree; /* index */
void *extra; /* extra(no-use) */
TXN_ISOLATION_TYPE iso_type;

/* root->to->leaf bsm path */
Expand Down
2 changes: 1 addition & 1 deletion tree/flusher.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void _flush_buffer_to_child(struct tree *t, struct node *child, struct nmb *buf)
struct mb_iter iter;

mb_iter_init(&iter, buf->pma);
while (mb_iterate(&iter)) {
while (mb_iter_next(&iter)) {
/* TODO(BohuTANG): check msn */
struct nmb_values nvalues;

Expand Down
2 changes: 1 addition & 1 deletion tree/leaf.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void _apply_msg_to_child(struct node *parent,
nmb_get_right_coord(buffer, right, &coord_right);

mb_iter_init(&iter, buffer->pma);
while (mb_iterate_on_range(&iter, &coord_left, &coord_right)) {
while (mb_iter_on_range(&iter, &coord_left, &coord_right)) {
struct nmb_values values;

nmb_get_values(&iter, &values);
Expand Down
Loading

0 comments on commit 7a6771e

Please sign in to comment.