Skip to content

Commit

Permalink
Merge pull request #858 from robertbaldyga/attach-fix-race-condition
Browse files Browse the repository at this point in the history
Fix race condition during cache attach
  • Loading branch information
robertbaldyga authored Nov 25, 2024
2 parents 2c28f33 + b850727 commit e630b81
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 13 deletions.
5 changes: 5 additions & 0 deletions src/engine/cache_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ void ocf_resolve_effective_cache_mode(ocf_cache_t cache,
return;
}

if (env_atomic_read(&cache->attach_pt)) {
req->cache_mode = ocf_req_cache_mode_pt;
return;
}

if (cache->pt_unaligned_io && !ocf_req_is_4k(req->addr, req->bytes)) {
req->cache_mode = ocf_req_cache_mode_pt;
return;
Expand Down
23 changes: 20 additions & 3 deletions src/mngt/ocf_mngt_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1901,18 +1901,35 @@ static void _ocf_mngt_attach_shutdown_status(ocf_pipeline_t pipeline,
_ocf_mngt_attach_shutdown_status_complete, context);
}


static void _ocf_mngt_attach_post_init_finish(void *priv)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;

ocf_refcnt_unfreeze(&cache->refcnt.d2c);

env_atomic_set(&cache->attach_pt, 0);

ocf_cache_log(cache, log_debug, "Cache attached\n");

ocf_pipeline_next(context->pipeline);
}

static void _ocf_mngt_attach_post_init(ocf_pipeline_t pipeline,
void *priv, ocf_pipeline_arg_t arg)
{
struct ocf_cache_attach_context *context = priv;
ocf_cache_t cache = context->cache;

env_atomic_set(&cache->attach_pt, 1);

ocf_cleaner_refcnt_unfreeze(cache);
ocf_refcnt_unfreeze(&cache->refcnt.metadata);

ocf_cache_log(cache, log_debug, "Cache attached\n");

ocf_pipeline_next(pipeline);
ocf_refcnt_freeze(&cache->refcnt.d2c);
ocf_refcnt_register_zero_cb(&cache->refcnt.d2c,
_ocf_mngt_attach_post_init_finish, context);
}

static void _ocf_mngt_attach_handle_error(
Expand Down
2 changes: 2 additions & 0 deletions src/ocf_cache_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ struct ocf_cache {
env_atomic flush_in_progress;
env_mutex flush_mutex;

env_atomic attach_pt;

struct ocf_cleaner cleaner;

struct list_head io_queues;
Expand Down
35 changes: 30 additions & 5 deletions tests/functional/pyocf/types/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ def alloc_device_config(self, device, perform_test=True):
def free_device_config(self, cfg):
lib = OcfLib.getInstance().ocf_volume_destroy(cfg._volume)

def attach_device(
def attach_device_async(
self,
device,
force=False,
Expand All @@ -593,14 +593,39 @@ def attach_device(

self.write_lock()

c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
def callback(c):
self.write_unlock()
self.free_device_config(device_config)

c = OcfCompletion(
[("cache", c_void_p), ("priv", c_void_p), ("error", c_int)],
callback=callback
)

self.owner.lib.ocf_mngt_cache_attach(self.cache_handle, byref(attach_cfg), c, None)
c.wait()

self.write_unlock()
return c

self.free_device_config(device_config)
def attach_device(
self,
device,
force=False,
perform_test=False,
cache_line_size=None,
open_cores=False,
disable_cleaner=False,
):

c = self.attach_device_async(
device,
force,
perform_test,
cache_line_size,
open_cores,
disable_cleaner
)

c.wait()

if c.results["error"]:
raise OcfError(
Expand Down
5 changes: 4 additions & 1 deletion tests/functional/pyocf/types/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __getitem__(self, key):
except KeyError:
raise KeyError(f"No completion argument {key} specified")

def __init__(self, completion_args: list, context=None):
def __init__(self, completion_args: list, context=None, callback=None):
"""
Provide ctypes arg list, and optionally index of status argument in
completion function which will be extracted (default - last argument).
Expand All @@ -95,13 +95,16 @@ def __init__(self, completion_args: list, context=None):
self.results = OcfCompletion.CompletionResult(completion_args)
self._as_parameter_ = self.callback
self.context = context
self.user_callback = callback

@property
def callback(self):
@CFUNCTYPE(c_void_p, *self.results.arg_types)
def complete(*args):
self.results.results = args
self.e.set()
if self.user_callback:
self.user_callback(self)

return complete

Expand Down
18 changes: 14 additions & 4 deletions tests/functional/tests/engine/test_d2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#


from time import sleep
import pytest


Expand All @@ -18,7 +19,6 @@
CORE_SIZE = 4096


@pytest.mark.xfail(reason="Data corruption when switching from D2C")
def test_d2c_io(pyocf_ctx):
"""
Start cache in D2C
Expand Down Expand Up @@ -46,7 +46,8 @@ def test_d2c_io(pyocf_ctx):
d2c_data.write(b"a" * CORE_SIZE, CORE_SIZE)
d2c_io.set_data(d2c_data)

cache.attach_device(cache_device)
c = cache.attach_device_async(cache_device)
sleep(1)

wt_io = vol.new_io(queue, 0, CORE_SIZE, IoDir.WRITE, 0, 0)
wt_data = Data(CORE_SIZE)
Expand All @@ -55,18 +56,27 @@ def test_d2c_io(pyocf_ctx):

wt_completion = Sync(wt_io).submit()
assert int(wt_completion.results["err"]) == 0
assert cache.get_stats()["req"]["wr_full_misses"]["value"] == 1

d2c_completion = Sync(d2c_io).submit()
assert int(d2c_completion.results["err"]) == 0
assert cache.get_stats()["req"]["wr_pt"]["value"] == 1

c.wait()

if c.results["error"]:
raise OcfError(
f"Attaching cache device failed",
c.results["error"],
)

assert cache.get_stats()["req"]["wr_pt"]["value"] == 2

read_io = vol.new_io(queue, 0, CORE_SIZE, IoDir.READ, 0, 0)
read_data = Data(CORE_SIZE)
read_io.set_data(read_data)

read_completion = Sync(read_io).submit()
assert int(read_completion.results["err"]) == 0
assert cache.get_stats()["req"]["rd_full_misses"]["value"] == 1

cache.stop()

Expand Down

0 comments on commit e630b81

Please sign in to comment.