Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(make_idempotent): support making incr request idempotent in pegasus_write_service #2192

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ struct update_request
// single-put request from the one translated from a non-idempotent atomic write request:
// - a general single-put request, if `type` is UT_PUT or not set by default as it's
// optional, or
// - a put request translated from a non-idempotent incr request, if `type` is UT_INCR.
// - a put request translated from an incr request, if `type` is UT_INCR.
4:optional update_type type;
}

Expand Down
72 changes: 64 additions & 8 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
_server(server),
_impl(new impl(server)),
_batch_start_time(0),
_make_incr_idempotent_duration_ns(0),
_cu_calculator(server->_cu_calculator.get()),
METRIC_VAR_INIT_replica(put_requests),
METRIC_VAR_INIT_replica(multi_put_requests),
Expand All @@ -171,7 +172,8 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(dup_time_lag_ms),
METRIC_VAR_INIT_replica(dup_lagging_writes),
_put_batch_size(0),
_remove_batch_size(0)
_remove_batch_size(0),
_incr_batch_size(0)
{
}

Expand Down Expand Up @@ -211,6 +213,37 @@ int pegasus_write_service::multi_remove(int64_t decree,
return err;
}

int pegasus_write_service::make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update)
{
const uint64_t start_time = dsn_now_ns();

const int err = _impl->make_idempotent(req, err_resp, update);

// Calculate the duration that an incr request is translated into an idempotent put request.
_make_incr_idempotent_duration_ns = dsn_now_ns() - start_time;

return err;
}

int pegasus_write_service::put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
{
// The total latency should also include the duration of the translation.
METRIC_VAR_AUTO_LATENCY(incr_latency_ns, dsn_now_ns() - _make_incr_idempotent_duration_ns);
METRIC_VAR_INCREMENT(incr_requests);

const int err = _impl->put(ctx, update, resp);

if (_server->is_primary()) {
_cu_calculator->add_incr_cu(resp.error, update.key);
}

return err;
}

int pegasus_write_service::incr(int64_t decree,
const dsn::apps::incr_request &update,
dsn::apps::incr_response &resp)
Expand Down Expand Up @@ -278,7 +311,23 @@ int pegasus_write_service::batch_put(const db_write_context &ctx,
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after batch_prepare");

++_put_batch_size;
if (!update.__isset.type || update.type == dsn::apps::update_type::UT_PUT) {
// This is a general single-put request.
++_put_batch_size;
} else {
// There are only two possible situations for batch_put() where this put request
// originates from an atomic write request:
// - now we are replaying plog into RocksDB at startup of this replica.
// - now we are in a secondary replica: just received a prepare request and appended
// it to plog, now we are applying it into RocksDB.
//
// Though this is a put request, we choose to udapte the metrics of its original
// request (i.e. the atomic write).
if (update.type == dsn::apps::update_type::UT_INCR) {
++_incr_batch_size;
}
}

int err = _impl->batch_put(ctx, update, resp);

if (_server->is_primary()) {
Expand Down Expand Up @@ -326,7 +375,7 @@ void pegasus_write_service::set_default_ttl(uint32_t ttl) { _impl->set_default_t

void pegasus_write_service::clear_up_batch_states()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name is not accurate, not only 'clean up', but also update the metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could name this function batch_finish() as opposed to batch_prepare: finish batch write with metrics such as latencies calculated and some states cleared.

{
#define PROCESS_WRITE_BATCH(op) \
#define UPDATE_WRITE_BATCH_METRICS(op) \
do { \
METRIC_VAR_INCREMENT_BY(op##_requests, static_cast<int64_t>(_##op##_batch_size)); \
METRIC_VAR_SET(op##_latency_ns, static_cast<size_t>(_##op##_batch_size), latency_ns); \
Expand All @@ -335,20 +384,27 @@ void pegasus_write_service::clear_up_batch_states()

auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time);

PROCESS_WRITE_BATCH(put);
PROCESS_WRITE_BATCH(remove);
// Take the latency of executing the entire batch as the latency for processing each
// request within it, since the latency of each request could not be known.
UPDATE_WRITE_BATCH_METRICS(put);
UPDATE_WRITE_BATCH_METRICS(remove);

// Since the duration of translation is unknown for both possible situations where these
// put requests are actually translated from atomic requests (see comments in batch_put()),
// there's no need to add `_make_incr_idempotent_duration_ns` to the total latency.
UPDATE_WRITE_BATCH_METRICS(incr);

_batch_start_time = 0;

#undef PROCESS_WRITE_BATCH
#undef UPDATE_WRITE_BATCH_METRICS
}

int pegasus_write_service::duplicate(int64_t decree,
const dsn::apps::duplicate_request &requests,
const dsn::apps::duplicate_request &update,
dsn::apps::duplicate_response &resp)
{
// Verifies the cluster_id.
for (const auto &request : requests.entries) {
for (const auto &request : update.entries) {
if (!dsn::replication::is_dup_cluster_id_configured(request.cluster_id)) {
resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint("request cluster id is unconfigured");
Expand Down
30 changes: 28 additions & 2 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,19 @@ class pegasus_write_service : dsn::replication::replica_base
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp);

// Write INCR record.
// Translate an INCR request into an idempotent PUT request. Only called by primary
// replicas.
int make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update);

// Write an idempotent INCR record (i.e. a PUT record) and reply to the client with INCR
// response. Only called by primary replicas.
int put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp);

// Write a non-idempotent INCR record.
int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp);

// Write CHECK_AND_SET record.
Expand Down Expand Up @@ -167,6 +179,8 @@ class pegasus_write_service : dsn::replication::replica_base
// Add PUT record in batch write.
// \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
// When called by secondary replicas, this put request may be translated from an incr
// request for idempotence.
int batch_put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp);
Expand Down Expand Up @@ -202,6 +216,12 @@ class pegasus_write_service : dsn::replication::replica_base

uint64_t _batch_start_time;

// Only used for primary replica to calculate the duration that an incr request from
// the client is translated into an idempotent put request before appended to plog,
// including reading the current value from RocksDB and incrementing it by a given
// amount.
uint64_t _make_incr_idempotent_duration_ns;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a member variable, which will be shared by all requests. But it's a request related latency, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is defined as per-replica rather than per-request, for the reason that the current design for implementing idempotence is to make sure there is only one atomic request being processed in the write pipeline for each replica. This pipeline consists of the following stages:
(1) read the current value from RocksDB and built the idempotent request based on it;
(2) append the corresponding mutation to plog;
(3) broadcast the prepare requests;
(4) apply the result for atomic operation back to RocksDB ultimately.

For a request, this variable will be set in stage (1) and read in stage (4); since there is only one request in the pipeline, this variable is guaranteed not to be set for another request before stage (4) is finished. Therefore, it is safe to define this variable as per-replica.


capacity_unit_calculator *_cu_calculator;

METRIC_VAR_DECLARE_counter(put_requests);
Expand All @@ -224,10 +244,16 @@ class pegasus_write_service : dsn::replication::replica_base
METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
METRIC_VAR_DECLARE_counter(dup_lagging_writes);

// Record batch size for put and remove requests.
// Measure the size of single-put requests in batch applied into RocksDB for metrics.
uint32_t _put_batch_size;

// Measure the size of single-remove requests in batch applied into RocksDB for metrics.
uint32_t _remove_batch_size;

// Measure the size of incr requests (with each translated into an idempotent put request)
// in batch applied into RocksDB for metrics.
uint32_t _incr_batch_size;

// TODO(wutao1): add metrics for failed rpc.
};

Expand Down
6 changes: 3 additions & 3 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return resp.error;
}

// Tranlate an incr request which is certainly non-idempotent into a single-put request
// which is certainly idempotent. Return current status for RocksDB.
// Tranlate an incr request into a single-put request which is certainly idempotent.
// Return current status for RocksDB. Only called by primary replicas.
int make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update)
Expand Down Expand Up @@ -229,7 +229,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
}

// Apply single-put request translated from incr request into RocksDB, and build response
// for incr. Return current status for RocksDB.
// for incr. Return current status for RocksDB. Only called by primary replicas.
int put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
Expand Down
3 changes: 3 additions & 0 deletions src/utils/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,9 @@ class percentile : public closeable_metric
_samples.get()[index & (_sample_size - 1)] = val;
}

// Set the same value for n times, used to treat a single value as the result of multiple
// observations, e.g. taking the latency of executing the entire batch as the latency for
// processing each request within it (see pegasus_write_service::clear_up_batch_states()).
void set(size_t n, const value_type &val)
{
for (size_t i = 0; i < n; ++i) {
Expand Down
Loading