Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft: implement the prepare fiber canceling #2126

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions cartridge/twophase.lua
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ vars:new('locks', {})
vars:new('prepared_config', nil)
vars:new('prepared_config_release_notification', fiber.cond())
vars:new('on_patch_triggers', {})
vars:new('prepare_fiber', nil)

vars:new('options', {
netbox_call_timeout = 1,
Expand All @@ -49,6 +50,7 @@ vars:new('options', {
local function release_config_lock()
local prepared_config = vars.prepared_config
vars.prepared_config = nil
vars.prepare_fiber = nil
vars.prepared_config_release_notification:broadcast()
return prepared_config
end
Expand Down Expand Up @@ -134,6 +136,7 @@ end
-- @treturn[2] nil
-- @treturn[2] table Error description
local function prepare_2pc(upload_id)
vars.prepare_fiber = fiber.self()
local data
if type(upload_id) == 'table' then
-- Preserve compatibility with older versions.
Expand Down Expand Up @@ -171,15 +174,16 @@ local function prepare_2pc(upload_id)

local workdir = confapplier.get_workdir()
local path_prepare = fio.pathjoin(workdir, 'config.prepare')

if vars.prepared_config ~= nil then
local err = Prepare2pcError:new('Two-phase commit is locked')
log.warn('%s', err)
return nil, err
end

local ok, err = ClusterwideConfig.save(clusterwide_config, path_prepare)
if not ok and fio.path.exists(path_prepare) then
local prepare_path_exists = fio.path.exists(path_prepare)
fiber.testcancel()
if not ok and prepare_path_exists then
err = Prepare2pcError:new('Two-phase commit is locked')
end

Expand Down Expand Up @@ -267,13 +271,37 @@ local function commit_2pc()
end
end

local function cancel_prepare_fiber()
local f = vars.prepare_fiber
if not f then
log.warn("Prepare fiber was not created")
return
end
if f:status() == 'dead' then
return
end
f:set_joinable(true)
local ok, err
ok, err = pcall(f.cancel, f)
if not ok then
log.warn("Cancel prepare fiber error: " .. err)
return
end
ok, err = pcall(f.join, f)
if not ok then
log.warn("Join prepare fiber error: " .. err)
return
end
end

--- Two-phase commit - abort stage.
--
-- Release the lock for further commit attempts.
-- @function abort_2pc
-- @local
-- @treturn boolean true
local function abort_2pc()
cancel_prepare_fiber()
local workdir = confapplier.get_workdir()
local path_prepare = fio.pathjoin(workdir, 'config.prepare')
ClusterwideConfig.remove(path_prepare)
Expand Down Expand Up @@ -440,7 +468,6 @@ local function twophase_commit(opts)
end

local _2pc_error
local abortion_list = {}
local activity_name = opts.activity_name or 'twophase_commit'

goto prepare
Expand Down Expand Up @@ -472,7 +499,6 @@ local function twophase_commit(opts)
for _, uri in ipairs(opts.uri_list) do
if retmap[uri] then
log.warn('Prepared for %s at %s', activity_name, uri)
table.insert(abortion_list, uri)
end
end
for _, uri in ipairs(opts.uri_list) do
Expand Down Expand Up @@ -523,11 +549,11 @@ local function twophase_commit(opts)
log.warn('(2PC) %s abort phase...', activity_name)

local retmap, errmap = pool.map_call(opts.fn_abort, nil,{
uri_list = abortion_list,
uri_list = opts.uri_list,
timeout = vars.options.netbox_call_timeout,
})

for _, uri in ipairs(abortion_list) do
for _, uri in ipairs(opts.uri_list) do
if retmap[uri] then
log.warn('Aborted %s at %s', activity_name, uri)
else
Expand Down
11 changes: 10 additions & 1 deletion cartridge/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,14 @@ local function mktree(path)
for _, dir in ipairs(dirs) do
current_dir = fio.pathjoin(current_dir, dir)
local stat = fio.stat(current_dir)
fiber.testcancel()
if stat == nil then
local _, err = fio.mkdir(current_dir)
fiber.testcancel()
local _errno = errno()
if err ~= nil and not fio.path.is_dir(current_dir) then
local is_dir = fio.path.is_dir(current_dir)
fiber.testcancel()
if err ~= nil and not is_dir then
return nil, errors.new('MktreeError',
'Error creating directory %q: %s',
current_dir, errno.strerror(_errno)
Expand Down Expand Up @@ -208,21 +212,26 @@ local function file_write(path, data, opts, perm)
opts = opts or {'O_CREAT', 'O_WRONLY', 'O_TRUNC'}
perm = perm or tonumber(644, 8)
local file = fio.open(path, opts, perm)
fiber.testcancel()
if file == nil then
return nil, OpenFileError:new('%s: %s', path, errno.strerror())
end

local res = file:write(data)
fiber.testcancel()
if not res then
local err = WriteFileError:new('%s: %s', path, errno.strerror())
fio.unlink(path)
fiber.testcancel()
return nil, err
end

local res = file:close()
fiber.testcancel()
if not res then
local err = WriteFileError:new('%s: %s', path, errno.strerror())
fio.unlink(path)
fiber.testcancel()
return nil, err
end

Expand Down
15 changes: 15 additions & 0 deletions test/integration/twophase_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,18 @@ function g.test_timeouts()
t.assert_equals(twophase.get_apply_config_timeout(), 111)
end)
end

-- Check we don't lock a clusterwide config updates
-- after exceeding of `validate_timeout_config` timeout
-- see https://github.com/tarantool/cartridge/issues/2119
function g.test_2pc_is_locked_after_prepare_timeout()
g.s1:exec(function()
local t = require('luatest')
local twophase = require('cartridge.twophase')
twophase.set_validate_config_timeout(0.001)
twophase.patch_clusterwide({})
twophase.set_validate_config_timeout(10) -- default
local _, err = twophase.patch_clusterwide({})
t.assert_not(err)
end)
end