Skip to content

Commit

Permalink
storage: fix moved buckets check
Browse files Browse the repository at this point in the history
'moved_buckets' function would treat as "moved" all the buckets
which are not strictly ACTIVE. But that isn't optimal.

Also the 'moved_buckets' func would assume that when ref creation
is started, by the end of it the buckets stay unchanged. That
isn't true.

The commit fixes both issues.

Firstly, when a bucket is SENDING, returning an error right away
isn't good. The router would just keep retrying then, without any
progress. The bucket is actually here, it is not moved yet.

Better let the storage try to take a ref. Then one of 2 results
are possible:
- It waits without useless active retries. And then SENDING fails
    and becomes ACTIVE. Ref is taken, all good.
- It waits without useless active retries. SENDING turns into
    SENT. Ref is taken for the other buckets, and this one is
    reported as moved.

Secondly, after a ref is taken, the not-moved buckets could become
moved. Need to re-check them before returning the ref. Luckily,
the storage can use bucket_generation to avoid this double-check
when nothing changed in _bucket.

NO_DOC=bugfix
  • Loading branch information
Gerold103 committed Oct 3, 2024
1 parent 1e408a0 commit ca08b7e
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 20 deletions.
170 changes: 164 additions & 6 deletions test/storage-luatest/storage_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ test_group.test_bucket_skip_validate = function(g)
end)
end

test_group.test_ref_with_buckets = function(g)
test_group.test_ref_with_buckets_basic = function(g)
g.replica_1_a:exec(function()
local lref = require('vshard.storage.ref')
local res, err, _
Expand Down Expand Up @@ -277,7 +277,11 @@ test_group.test_ref_with_buckets = function(g)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {
rid = rid,
moved = {bucket_count + 1, bucket_count + 2, bucket_count + 3}
moved = {
{id = bucket_count + 1},
{id = bucket_count + 2},
{id = bucket_count + 3},
}
})
_, err = ivshard.storage._call('storage_unref', rid)
ilt.assert_equals(err, nil)
Expand All @@ -290,22 +294,173 @@ test_group.test_ref_with_buckets = function(g)
{bucket_count + 1, bucket_count + 2}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {moved = {bucket_count + 1, bucket_count + 2}})
ilt.assert_equals(res, {moved = {
{id = bucket_count + 1},
{id = bucket_count + 2},
}})
ilt.assert_equals(lref.count, 0)
end)
end

test_group.test_ref_with_buckets_timeout = function(g)
g.replica_1_a:exec(function()
local lref = require('vshard.storage.ref')
local rid = 42
local bids = _G.get_n_buckets(2)
--
-- Timeout when some buckets aren't writable. Doesn't have to be the
-- same buckets as for moving.
--
_G.bucket_recovery_pause()
box.space._bucket:update(
{bids[1]}, {{'=', 2, ivconst.BUCKET.SENDING}})
res, err = ivshard.storage._call(
local res, err = ivshard.storage._call(
'storage_ref_with_buckets', rid, 0.01, {bids[2]})
box.space._bucket:update(
{bids[1]}, {{'=', 2, ivconst.BUCKET.ACTIVE}})
_G.bucket_recovery_continue()
t.assert_str_contains(err.message, 'Timeout exceeded')
ilt.assert_equals(res, nil)
ilt.assert_equals(lref.count, 0)
_G.bucket_recovery_continue()
end)
end

test_group.test_ref_with_buckets_return_last_known_dst = function(g)
g.replica_1_a:exec(function()
local lref = require('vshard.storage.ref')
local rid = 42
local bid = _G.get_first_bucket()
local luuid = require('uuid')
local id = luuid.str()
-- Make the bucket follow the correct state sequence. Another way is
-- validated and not allowed.
_G.bucket_recovery_pause()
_G.bucket_gc_pause()
box.space._bucket:update(
{bid}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}})
box.space._bucket:update(
{bid}, {{'=', 2, ivconst.BUCKET.SENT}})
local res, err = ivshard.storage._call(
'storage_ref_with_buckets', rid, iwait_timeout, {bid})
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {moved = {{
id = bid,
dst = id,
status = ivconst.BUCKET.SENT,
}}})
ilt.assert_equals(lref.count, 0)
-- Cleanup.
_G.bucket_gc_continue()
_G.bucket_gc_wait()
box.space._bucket:insert({bid, ivconst.BUCKET.RECEIVING})
box.space._bucket:update({bid}, {{'=', 2, ivconst.BUCKET.ACTIVE}})
_G.bucket_recovery_continue()
end)
end

test_group.test_ref_with_buckets_move_part_while_referencing = function(g)
g.replica_1_a:exec(function()
local lref = require('vshard.storage.ref')
local _
local rid = 42
local bids = _G.get_n_buckets(3)
local luuid = require('uuid')
local id = luuid.str()
--
-- Was not moved until ref, but moved while ref was waiting.
--
_G.bucket_recovery_pause()
_G.bucket_gc_pause()
-- Block the refs for a while.
box.space._bucket:update(
{bids[3]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}})
-- Start referencing.
local session_id
local f = ifiber.new(function()
session_id = box.session.id()
return ivshard.storage._call('storage_ref_with_buckets', rid,
iwait_timeout, {bids[1], bids[2]})
end)
f:set_joinable(true)
-- While waiting, one of the target buckets starts moving.
box.space._bucket:update(
{bids[2]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}})
-- Now they are moved.
box.space._bucket:update({bids[2]}, {{'=', 2, ivconst.BUCKET.SENT}})
box.space._bucket:update({bids[3]}, {{'=', 2, ivconst.BUCKET.SENT}})
_G.bucket_gc_continue()
_G.bucket_gc_wait()
local ok, res, err = f:join()
t.assert(ok)
t.assert_equals(err, nil)
ilt.assert_equals(res, {
moved = {{id = bids[2], dst = id}},
rid = rid,
})
-- Ref was done, because at least one bucket was ok.
ilt.assert_equals(lref.count, 1)
-- Cleanup.
_, err = lref.del(rid, session_id)
ilt.assert_equals(err, nil)
ilt.assert_equals(lref.count, 0)
box.space._bucket:insert({bids[2], ivconst.BUCKET.RECEIVING})
box.space._bucket:update({bids[2]}, {{'=', 2, ivconst.BUCKET.ACTIVE}})
box.space._bucket:insert({bids[3], ivconst.BUCKET.RECEIVING})
box.space._bucket:update({bids[3]}, {{'=', 2, ivconst.BUCKET.ACTIVE}})
_G.bucket_recovery_continue()
end)
end

test_group.test_ref_with_buckets_move_all_while_referencing = function(g)
g.replica_1_a:exec(function()
local lref = require('vshard.storage.ref')
local rid = 42
local bids = _G.get_n_buckets(3)
local luuid = require('uuid')
local id = luuid.str()
--
-- Now same, but all buckets were moved. No ref should be left then.
--
_G.bucket_recovery_pause()
_G.bucket_gc_pause()
-- Block the refs for a while.
box.space._bucket:update(
{bids[3]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}})
-- Start referencing.
local f = ifiber.new(function()
return ivshard.storage._call('storage_ref_with_buckets', rid,
iwait_timeout, {bids[1], bids[2]})
end)
f:set_joinable(true)
-- While waiting, all the target buckets start moving.
box.space._bucket:update(
{bids[1]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}})
box.space._bucket:update(
{bids[2]}, {{'=', 2, ivconst.BUCKET.SENDING}, {'=', 3, id}})
-- Now they are moved.
box.space._bucket:update({bids[1]}, {{'=', 2, ivconst.BUCKET.SENT}})
box.space._bucket:update({bids[2]}, {{'=', 2, ivconst.BUCKET.SENT}})
-- And the other bucket doesn't matter. Can revert it back.
box.space._bucket:update({bids[3]}, {{'=', 2, ivconst.BUCKET.ACTIVE}})
_G.bucket_gc_continue()
_G.bucket_gc_wait()
local ok, res, err = f:join()
t.assert(ok)
t.assert_equals(err, nil)
ilt.assert_equals(res, {
moved = {
{id = bids[1], dst = id},
{id = bids[2], dst = id},
}
})
-- Ref was not done, because all the buckets moved out.
ilt.assert_equals(lref.count, 0)
-- Cleanup.
box.space._bucket:insert({bids[1], ivconst.BUCKET.RECEIVING})
box.space._bucket:update({bids[1]}, {{'=', 2, ivconst.BUCKET.ACTIVE}})
box.space._bucket:insert({bids[2], ivconst.BUCKET.RECEIVING})
box.space._bucket:update({bids[2]}, {{'=', 2, ivconst.BUCKET.ACTIVE}})
_G.bucket_recovery_continue()
end)
end

Expand All @@ -324,6 +479,9 @@ test_group.test_moved_buckets = function(g)
{_G.get_first_bucket(), bucket_count + 1, bucket_count + 2}
)
ilt.assert_equals(err, nil)
ilt.assert_equals(res, {moved = {bucket_count + 1, bucket_count + 2}})
ilt.assert_equals(res, {moved = {
{id = bucket_count + 1},
{id = bucket_count + 2},
}})
end)
end
14 changes: 11 additions & 3 deletions vshard/router/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -975,9 +975,17 @@ local function router_ref_storage_by_buckets(router, bucket_ids, timeout)
err_id = id
goto fail
end
for _, bucket_id in pairs(res.moved) do
bucket_reset(router, bucket_id)
table.insert(bucket_ids, bucket_id)
for _, bucket in pairs(res.moved) do
local bid = bucket.id
local dst = bucket.dst
-- 'Reset' regardless of 'set'. So as not to
-- bother with 'set' errors. If it fails, then
-- won't matter. It is a best-effort thing.
bucket_reset(router, bid)
if dst ~= nil then
bucket_set(router, bid, dst)
end
table.insert(bucket_ids, bid)
end
if res.rid then
assert(not replicasets_to_map[id])
Expand Down
45 changes: 34 additions & 11 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3188,17 +3188,38 @@ end
-- not present on the current instance.
--
local function storage_moved_buckets(bucket_ids)
local status = consts.BUCKET
local allstatus = consts.BUCKET
local moved_buckets = {}
for _, bucket_id in pairs(bucket_ids) do
local bucket = box.space._bucket:get{bucket_id}
if not bucket or bucket.status ~= status.ACTIVE then
table.insert(moved_buckets, bucket_id)
local is_moved
if not bucket then
is_moved = true
else
local status = bucket.status
is_moved = status ~= allstatus.ACTIVE and
status ~= allstatus.PINNED and
status ~= allstatus.SENDING
end
if is_moved then
table.insert(moved_buckets, {
id = bucket_id,
dst = bucket and bucket.destination or M.route_map[bucket_id],
status = bucket and bucket.status,
})
end
end
return { moved = moved_buckets }
end

--
-- Drop a storage ref from the current box session. Is used as a part of
-- Map-Reduce API.
--
local function storage_unref(rid)
return lref.del(rid, box.session.id())
end

--
-- Bind a new storage ref to the current box session and check
-- that all the buckets are present. Is used as a part of the
Expand All @@ -3219,21 +3240,23 @@ local function storage_ref_with_buckets(rid, timeout, bucket_ids)
-- ref.
return {rid = nil, moved = moved}
end
local bucket_generation = M.bucket_generation
local ok, err = storage_ref(rid, timeout)
if not ok then
return nil, err
end
if M.bucket_generation ~= bucket_generation then
-- Need to redo it. Otherwise there is a risk that some buckets were
-- moved while waiting for the ref.
moved = storage_moved_buckets(bucket_ids).moved
if #moved == #bucket_ids then
storage_unref(rid)
rid = nil
end
end
return {rid = rid, moved = moved}
end

--
-- Drop a storage ref from the current box session. Is used as a part of
-- Map-Reduce API.
--
local function storage_unref(rid)
return lref.del(rid, box.session.id())
end

--
-- Execute a user's function under an infinite storage ref protecting from
-- bucket moves. The ref should exist before, and is deleted after, regardless
Expand Down

0 comments on commit ca08b7e

Please sign in to comment.