Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

driver: implement ready space for vinyl engine #236

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 139 additions & 76 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ function tube.new(space, on_task_change, opts)

local space_ready_buffer_name = space.name .. "_ready_buffer"
local space_ready_buffer = box.space[space_ready_buffer_name]
-- Feature implemented only for memtx engine for now.
-- https://github.com/tarantool/queue/issues/230.
if opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER and opts.engine == 'vinyl' then
error(string.format('"%s" storage mode cannot be used with vinyl engine',
tube.STORAGE_MODE_READY_BUFFER))
end

local ready_space_mode = (opts.storage_mode == tube.STORAGE_MODE_READY_BUFFER)
if ready_space_mode then
Expand Down Expand Up @@ -167,6 +161,10 @@ local function commit()
box.commit()
end

local function rollback()
box.rollback()
end

local function empty()
end

Expand All @@ -179,14 +177,31 @@ local function begin_if_not_in_txn()

if not box.is_in_txn() then
box.begin(transaction_opts)
return commit
return commit, rollback
else
return empty
return empty, empty
end
end

-- Try commiting operations until success. This is required for 'vinyl' engine.
-- In case of a transaction conflict for 'vinyl' we need to retry an entire
-- transaction.
local function try_commit_several_times(func, ...)
local ok = false
local ret
while not ok do
local commit_func, rollback_func = begin_if_not_in_txn()
ok, ret = pcall(func, commit_func, ...)
if ok then
return ret
end
rollback_func()
require('fiber').yield()
end
end

-- put task in space
function method.put(self, data, opts)
local function put(self, data, opts, commit_func)
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
Expand All @@ -197,8 +212,6 @@ function method.put(self, data, opts)
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
local commit_func = begin_if_not_in_txn()

local max = self.space.index.task_id:max()

local id = max and max[1] + 1 or 0
Expand All @@ -213,11 +226,18 @@ function method.put(self, data, opts)
return task
end

-- put task in space
function method.put(self, data, opts)
local commit_body = function(commit_func)
return put(self, data, opts, commit_func)
end

return try_commit_several_times(commit_body)
end

-- Take the first task form the ready_buffer.
local function take_ready(self)
local function take_ready(self, commit_func)
while true do
local commit_func = begin_if_not_in_txn()

local task_ready = self.space_ready_buffer.index.task_id:min()
if task_ready == nil then
commit_func()
Expand Down Expand Up @@ -247,45 +267,57 @@ local function take_ready(self)
end
end

local function take(self)
for s, task in self.space.index.status:pairs(state.READY,
{ iterator = 'GE' }) do
if task[2] ~= state.READY then
break
end
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
-- It is hapenning because 'min' for several takes in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
local commit_func = begin_if_not_in_txn()
local taken = self.space.index.utube:min{state.TAKEN, task[3]}
local take_complete = false
local function take_step(self, task, commit_func)
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
-- It is hapenning because 'min' for several takes in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
local taken = self.space.index.utube:min{state.TAKEN, task[3]}
local take_complete = false

if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
take_complete = true
end
if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
take_complete = true
end

commit_func()
if take_complete then
self.on_task_change(task, 'take')
return task
end
commit_func()
if take_complete then
self.on_task_change(task, 'take')
return task
end
end

-- take task
function method.take(self)
if self.ready_space_mode then
return take_ready(self)
local commit_body = function(commit_func)
return take_ready(self, commit_func)
end

return try_commit_several_times(commit_body)
end

for _, task in self.space.index.status:pairs(state.READY,
{ iterator = 'GE' }) do
if task[2] ~= state.READY then
break
end

local commit_body = function(commit_func)
return take_step(self, task, commit_func)
end

local ret = try_commit_several_times(commit_body)
if ret ~= nil then
return ret
end
end
return take(self)
end

-- touch task
Expand All @@ -300,9 +332,7 @@ local function delete_ready(self, id, utube)
end

-- delete task
function method.delete(self, id)
local commit_func = begin_if_not_in_txn()

local function delete(self, id, commit_func)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
Expand Down Expand Up @@ -331,10 +361,17 @@ function method.delete(self, id)
return task
end

-- release task
function method.release(self, id, opts)
local commit_func = begin_if_not_in_txn()
-- delete task
function method.delete(self, id)
local commit_body = function(commit_func)
return delete(self, id, commit_func)
end

return try_commit_several_times(commit_body)
end

-- release task
local function release(self, id, opts, commit_func)
local task = self.space:update(id, {{ '=', 2, state.READY }})
if task ~= nil then
if self.ready_space_mode then
Expand All @@ -357,10 +394,17 @@ function method.release(self, id, opts)
return task
end

-- bury task
function method.bury(self, id)
local commit_func = begin_if_not_in_txn()
-- release task
function method.release(self, id, opts)
local commit_body = function(commit_func)
return release(self, id, opts, commit_func)
end

return try_commit_several_times(commit_body)
end

-- bury task
local function bury(self, id, commit_func)
local current_task = self.space:get{id}
local task = self.space:update(id, {{ '=', 2, state.BURIED }})
if task ~= nil then
Expand Down Expand Up @@ -390,35 +434,54 @@ function method.bury(self, id)
return task
end

-- unbury several tasks
function method.kick(self, count)
for i = 1, count do
local commit_func = begin_if_not_in_txn()
-- bury task
function method.bury(self, id)
local commit_body = function(commit_func)
return bury(self, id, commit_func)
end

local task = self.space.index.status:min{ state.BURIED }
if task == nil then
return i - 1
end
if task[2] ~= state.BURIED then
return i - 1
end
return try_commit_several_times(commit_body)
end

task = self.space:update(task[1], {{ '=', 2, state.READY }})
if self.ready_space_mode then
local prev_task = self.space_ready_buffer.index.utube:get{task[3]}
if prev_task ~= nil then
if prev_task[1] > task[1] then
self.space_ready_buffer:delete(prev_task[1])
self.space_ready_buffer:insert({task[1], task[2]})
end
else
put_ready(self, task[3])
-- unbury several tasks
local function kick_step(self, id, commit_func)
local task = self.space.index.status:min{ state.BURIED }
if task == nil then
return id - 1
end
if task[2] ~= state.BURIED then
return id - 1
end

task = self.space:update(task[1], {{ '=', 2, state.READY }})
if self.ready_space_mode then
local prev_task = self.space_ready_buffer.index.utube:get{task[3]}
if prev_task ~= nil then
if prev_task[1] > task[1] then
self.space_ready_buffer:delete(prev_task[1])
self.space_ready_buffer:insert({task[1], task[2]})
end
else
put_ready(self, task[3])
end
end

commit_func()
commit_func()

self.on_task_change(task, 'kick')
end

self.on_task_change(task, 'kick')
-- unbury several tasks
function method.kick(self, count)
for i = 1, count do
local commit_body = function(commit_func)
return kick_step(self, i, commit_func)
end

local ret = try_commit_several_times(commit_body)
if ret ~= nil then
return ret
end
end
return count
end
Expand Down
40 changes: 9 additions & 31 deletions t/030-utube.t
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@ test:ok(queue, 'queue is loaded')
local tube = queue.create_tube('test', 'utube', { engine = engine })
local tube2 = queue.create_tube('test_stat', 'utube', { engine = engine })
local tube_ready, tube2_ready
if engine ~= 'vinyl' then
tube_ready = queue.create_tube('test_ready', 'utube',
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
tube2_ready = queue.create_tube('test_stat_ready', 'utube',
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
end
tube_ready = queue.create_tube('test_ready', 'utube',
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
tube2_ready = queue.create_tube('test_stat_ready', 'utube',
{ engine = engine, storage_mode = queue.driver.utube.STORAGE_MODE_READY_BUFFER })
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'utube', 'tube.type')

test:test('Utube statistics', function(test)
if engine ~= 'vinyl' then
test:plan(13 * 2)
else
test:plan(13)
end
test:plan(13 * 2)
for _, tube_stat in ipairs({tube2, tube2_ready}) do
if tube_stat == nil then
break
Expand Down Expand Up @@ -78,11 +72,7 @@ end)


test:test('Easy put/take/ack', function(test)
if engine ~= 'vinyl' then
test:plan(12 * 2)
else
test:plan(12)
end
test:plan(12 * 2)

for _, test_tube in ipairs({tube, tube_ready}) do
if test_tube == nil then
Expand Down Expand Up @@ -110,11 +100,7 @@ test:test('Easy put/take/ack', function(test)
end)

test:test('ack in utube', function(test)
if engine ~= 'vinyl' then
test:plan(8 * 2)
else
test:plan(8)
end
test:plan(8 * 2)

for _, test_tube in ipairs({tube, tube_ready}) do
if test_tube == nil then
Expand Down Expand Up @@ -145,11 +131,7 @@ test:test('ack in utube', function(test)
end
end)
test:test('bury in utube', function(test)
if engine ~= 'vinyl' then
test:plan(8 * 2)
else
test:plan(8)
end
test:plan(8 * 2)

for _, test_tube in ipairs({tube, tube_ready}) do
if test_tube == nil then
Expand Down Expand Up @@ -180,11 +162,7 @@ test:test('bury in utube', function(test)
end
end)
test:test('instant bury', function(test)
if engine ~= 'vinyl' then
test:plan(1 * 2)
else
test:plan(1)
end
test:plan(1 * 2)
tube:put(1, {ttr=60})
local taken = tube:take(.1)
test:is(tube:bury(taken[1])[2], '!', 'task is buried')
Expand Down
Loading