Skip to content

Commit

Permalink
recovery: simplify the procedure of recovery
Browse files Browse the repository at this point in the history
Before the patch a special map of buckets to recovery was
maintained during a master lifecycle. But 1) the same buckets
can be got from _bucket.index.status iterator, 2) it does not
work when a master switch emerges during transfer so a bucket
arrives to a new master via replication after it is configured.
Such buckets are not put into the map of buckets to recovery.

This patch gets rid of this map and uses directly
_bucket.index.status.
  • Loading branch information
Gerold103 committed Aug 22, 2018
1 parent 5292cfe commit 5f66134
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 104 deletions.
7 changes: 1 addition & 6 deletions test/rebalancer/bucket_ref.result
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,8 @@ finish_refs = true
while vshard.storage.buckets_info(1)[1].rw_lock do fiber.sleep(0.01) end
---
...
vshard.storage.buckets_info(1)
while box.space._bucket:get{1} do fiber.sleep(0.01) end
---
- 1:
status: sent
ro_lock: true
destination: <replicaset_1>
id: 1
...
_ = test_run:switch('box_1_a')
---
Expand Down
2 changes: 1 addition & 1 deletion test/rebalancer/bucket_ref.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ fiber.sleep(0.2)
vshard.storage.buckets_info(1)
finish_refs = true
while vshard.storage.buckets_info(1)[1].rw_lock do fiber.sleep(0.01) end
vshard.storage.buckets_info(1)
while box.space._bucket:get{1} do fiber.sleep(0.01) end
_ = test_run:switch('box_1_a')
vshard.storage.buckets_info(1)

Expand Down
2 changes: 1 addition & 1 deletion test/rebalancer/receiving_bucket.result
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ while box.space._bucket:get{101}.status ~= vshard.consts.BUCKET.ACTIVE do vshard
...
box.space._bucket:get{101}
---
- [101, 'active', '<replicaset_1>']
- [101, 'active']
...
_ = test_run:switch('box_1_a')
---
Expand Down
16 changes: 0 additions & 16 deletions test/rebalancer/restart_during_rebalancing.result
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,6 @@ vshard.storage.info().bucket
pinned: 0
sending: 0
...
vshard.storage.internal.buckets_to_recovery
---
- []
...
check_consistency()
---
- true
Expand All @@ -316,10 +312,6 @@ vshard.storage.info().bucket
pinned: 0
sending: 0
...
vshard.storage.internal.buckets_to_recovery
---
- []
...
check_consistency()
---
- true
Expand All @@ -337,10 +329,6 @@ vshard.storage.info().bucket
pinned: 0
sending: 0
...
vshard.storage.internal.buckets_to_recovery
---
- []
...
check_consistency()
---
- true
Expand All @@ -358,10 +346,6 @@ vshard.storage.info().bucket
pinned: 0
sending: 0
...
vshard.storage.internal.buckets_to_recovery
---
- []
...
check_consistency()
---
- true
Expand Down
4 changes: 0 additions & 4 deletions test/rebalancer/restart_during_rebalancing.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,15 @@ util.check_loading_result()

test_run:switch('fullbox_1_a')
vshard.storage.info().bucket
vshard.storage.internal.buckets_to_recovery
check_consistency()
test_run:switch('fullbox_2_a')
vshard.storage.info().bucket
vshard.storage.internal.buckets_to_recovery
check_consistency()
test_run:switch('fullbox_3_a')
vshard.storage.info().bucket
vshard.storage.internal.buckets_to_recovery
check_consistency()
test_run:switch('fullbox_4_a')
vshard.storage.info().bucket
vshard.storage.internal.buckets_to_recovery
check_consistency()

test_run:switch('default')
Expand Down
16 changes: 8 additions & 8 deletions test/storage/recovery.result
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ _ = test_run:switch('storage_2_a')
_bucket = box.space._bucket
---
...
_bucket:replace{2, vshard.consts.BUCKET.ACTIVE, util.replicasets[1]}
_bucket:replace{2, vshard.consts.BUCKET.ACTIVE}
---
- [2, 'active', '<replicaset_1>']
- [2, 'active']
...
_bucket:replace{3, vshard.consts.BUCKET.SENDING, util.replicasets[1]}
---
Expand Down Expand Up @@ -87,7 +87,7 @@ _ = test_run:switch('storage_2_a')
...
_bucket:select{}
---
- - [2, 'active', '<replicaset_1>']
- - [2, 'active']
- [3, 'sending', '<replicaset_1>']
...
_ = test_run:switch('storage_1_a')
Expand Down Expand Up @@ -129,9 +129,9 @@ _bucket:replace{1, vshard.consts.BUCKET.SENDING, util.replicasets[2]}
_ = test_run:switch('storage_2_a')
---
...
_bucket:replace{1, vshard.consts.BUCKET.ACTIVE, util.replicasets[1]}
_bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
---
- [1, 'active', '<replicaset_1>']
- [1, 'active']
...
_ = test_run:switch('default')
---
Expand Down Expand Up @@ -180,9 +180,9 @@ _bucket = box.space._bucket
...
_bucket:select{}
---
- - [1, 'active', '<replicaset_1>']
- [2, 'active', '<replicaset_1>']
- [3, 'active', '<replicaset_1>']
- - [1, 'active']
- [2, 'active']
- [3, 'active']
...
--
-- Test a case when a bucket is sending in one place and garbage
Expand Down
4 changes: 2 additions & 2 deletions test/storage/recovery.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ _bucket:replace{3, vshard.consts.BUCKET.RECEIVING, util.replicasets[2]}

_ = test_run:switch('storage_2_a')
_bucket = box.space._bucket
_bucket:replace{2, vshard.consts.BUCKET.ACTIVE, util.replicasets[1]}
_bucket:replace{2, vshard.consts.BUCKET.ACTIVE}
_bucket:replace{3, vshard.consts.BUCKET.SENDING, util.replicasets[1]}

_ = test_run:cmd('stop server storage_1_a')
Expand Down Expand Up @@ -54,7 +54,7 @@ while _bucket:count() ~= 2 do vshard.storage.recovery_wakeup() fiber.sleep(0.1)
_ = test_run:switch('storage_1_a')
_bucket:replace{1, vshard.consts.BUCKET.SENDING, util.replicasets[2]}
_ = test_run:switch('storage_2_a')
_bucket:replace{1, vshard.consts.BUCKET.ACTIVE, util.replicasets[1]}
_bucket:replace{1, vshard.consts.BUCKET.ACTIVE}
_ = test_run:switch('default')
_ = test_run:cmd('stop server storage_2_a')
_ = test_run:cmd('stop server storage_1_a')
Expand Down
86 changes: 84 additions & 2 deletions test/storage/recovery_errinj.result
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,92 @@ _bucket:get{1}
_ = test_run:switch('storage_1_a')
---
...
fiber = require('fiber')
while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
---
...
while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end
--
-- Test a case when a bucket was sending on a source. Then the
-- master was switched. In such a case the sending will fail
-- and the new master should recovery the bucket.
--
_ = test_run:switch('storage_2_a')
---
...
box.error.injection.set("ERRINJ_WAL_DELAY", true)
---
- ok
...
err = nil
---
...
ok = nil
---
...
f = fiber.create(function() ok, err = vshard.storage.bucket_send(1, util.replicasets[1]) end)
---
...
while not vshard.storage.buckets_info(1)[1].rw_lock do fiber.sleep(0.01) end
---
...
cfg.sharding[util.replicasets[2]].replicas[util.name_to_uuid.storage_2_a].master = false
---
...
cfg.sharding[util.replicasets[2]].replicas[util.name_to_uuid.storage_2_b].master = true
---
...
vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a)
---
...
box.space._bucket:get{1}
---
- [1, 'sending', '<replicaset_1>']
...
_ = test_run:switch('storage_2_b')
---
...
cfg.sharding[util.replicasets[2]].replicas[util.name_to_uuid.storage_2_a].master = false
---
...
cfg.sharding[util.replicasets[2]].replicas[util.name_to_uuid.storage_2_b].master = true
---
...
vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_b)
---
...
box.space._bucket:get{1}
---
- [1, 'active']
...
_ = test_run:switch('storage_2_a')
---
...
box.error.injection.set("ERRINJ_WAL_DELAY", false)
---
- ok
...
while not err do fiber.sleep(0.01) end
---
...
ok, err
---
- false
- Can't modify data because this instance is in read-only mode.
...
box.space._bucket:get{1}
---
- [1, 'sending', '<replicaset_1>']
...
_ = test_run:switch('storage_2_b')
---
...
while box.space._bucket:get{1}.status ~= vshard.consts.BUCKET.SENDING do fiber.sleep(0.01) end
---
...
box.space._bucket:get{1}
---
- [1, 'sending', '<replicaset_1>']
...
while box.space._bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.01) end
---
...
_ = test_run:switch("default")
Expand Down
33 changes: 32 additions & 1 deletion test/storage/recovery_errinj.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,40 @@ while _bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do fiber.sleep(0.01)
_bucket:get{1}

_ = test_run:switch('storage_1_a')
fiber = require('fiber')
while _bucket:count() ~= 0 do vshard.storage.recovery_wakeup() fiber.sleep(0.1) end

--
-- Test a case when a bucket was sending on a source. Then the
-- master was switched. In such a case the sending will fail
-- and the new master should recovery the bucket.
--
_ = test_run:switch('storage_2_a')
box.error.injection.set("ERRINJ_WAL_DELAY", true)
err = nil
ok = nil
f = fiber.create(function() ok, err = vshard.storage.bucket_send(1, util.replicasets[1]) end)
while not vshard.storage.buckets_info(1)[1].rw_lock do fiber.sleep(0.01) end
cfg.sharding[util.replicasets[2]].replicas[util.name_to_uuid.storage_2_a].master = false
cfg.sharding[util.replicasets[2]].replicas[util.name_to_uuid.storage_2_b].master = true
vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_a)
box.space._bucket:get{1}

_ = test_run:switch('storage_2_b')
cfg.sharding[util.replicasets[2]].replicas[util.name_to_uuid.storage_2_a].master = false
cfg.sharding[util.replicasets[2]].replicas[util.name_to_uuid.storage_2_b].master = true
vshard.storage.cfg(cfg, util.name_to_uuid.storage_2_b)
box.space._bucket:get{1}

_ = test_run:switch('storage_2_a')
box.error.injection.set("ERRINJ_WAL_DELAY", false)
while not err do fiber.sleep(0.01) end
ok, err
box.space._bucket:get{1}
_ = test_run:switch('storage_2_b')
while box.space._bucket:get{1}.status ~= vshard.consts.BUCKET.SENDING do fiber.sleep(0.01) end
box.space._bucket:get{1}
while box.space._bucket:get{1}.status ~= vshard.consts.BUCKET.ACTIVE do vshard.storage.recovery_wakeup() fiber.sleep(0.01) end

_ = test_run:switch("default")
test_run:drop_cluster(REPLICASET_2)
test_run:drop_cluster(REPLICASET_1)
Expand Down
Loading

0 comments on commit 5f66134

Please sign in to comment.