-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
71 changed files
with
5,871 additions
and
5,259 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Run manually to reformat a file: | ||
# clang-format -i --style=file <file> | ||
# find . -iname '*.cc' -o -iname '*.h' -o -iname '*.h.in' | xargs clang-format -i --style=file | ||
BasedOnStyle: Google | ||
DerivePointerAlignment: false | ||
|
||
# FIXME(hds) | ||
IndentWidth: 4 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,11 @@ | ||
// Copyright (c) 2015 Baidu.com, Inc. All Rights Reserved | ||
// | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
|
@@ -14,31 +14,30 @@ | |
|
||
// Authors: Zhangyi Chen([email protected]) | ||
|
||
#include "braft/ballot_box.h" | ||
|
||
#include <bthread/unstable.h> | ||
#include <butil/scoped_lock.h> | ||
#include <bvar/latency_recorder.h> | ||
#include <bthread/unstable.h> | ||
|
||
#include <cstddef> | ||
#include <optional> | ||
#include "braft/ballot_box.h" | ||
#include "braft/util.h" | ||
#include "braft/fsm_caller.h" | ||
|
||
#include "braft/closure_queue.h" | ||
#include "braft/fsm_caller.h" | ||
#include "braft/util.h" | ||
|
||
namespace braft { | ||
|
||
BallotBox::BallotBox() | ||
: _waiter(NULL) | ||
, _closure_queue(NULL) | ||
, _last_committed_index(0) | ||
, _pending_index(0) | ||
{ | ||
} | ||
: _waiter(NULL), | ||
_closure_queue(NULL), | ||
_last_committed_index(0), | ||
_pending_index(0) {} | ||
|
||
BallotBox::~BallotBox() { | ||
clear_pending_tasks(); | ||
} | ||
BallotBox::~BallotBox() { clear_pending_tasks(); } | ||
|
||
int BallotBox::init(const BallotBoxOptions &options) { | ||
int BallotBox::init(const BallotBoxOptions& options) { | ||
if (options.waiter == NULL || options.closure_queue == NULL) { | ||
LOG(ERROR) << "waiter is NULL"; | ||
return EINVAL; | ||
|
@@ -48,9 +47,9 @@ int BallotBox::init(const BallotBoxOptions &options) { | |
return 0; | ||
} | ||
|
||
int BallotBox::commit_at( | ||
int64_t first_log_index, int64_t last_log_index, const PeerId& peer) { | ||
// FIXME(chenzhangyi01): The cricital section is unacceptable because it | ||
int BallotBox::commit_at(int64_t first_log_index, int64_t last_log_index, | ||
const PeerId& peer) { | ||
// FIXME(chenzhangyi01): The cricital section is unacceptable because it | ||
// blocks all the other Replicators and LogManagers | ||
std::unique_lock<raft_mutex_t> lck(_mutex); | ||
if (_pending_index == 0) { | ||
|
@@ -59,14 +58,16 @@ int BallotBox::commit_at( | |
if (last_log_index < _pending_index) { | ||
return 0; | ||
} | ||
if (last_log_index >= _pending_index + (int64_t)_pending_meta_queue.size()) { | ||
if (last_log_index >= | ||
_pending_index + (int64_t)_pending_meta_queue.size()) { | ||
return ERANGE; | ||
} | ||
|
||
int64_t last_committed_index = 0; | ||
const int64_t start_at = std::max(_pending_index, first_log_index); | ||
Ballot::PosHint pos_hint; | ||
for (int64_t log_index = start_at; log_index <= last_log_index; ++log_index) { | ||
for (int64_t log_index = start_at; log_index <= last_log_index; | ||
++log_index) { | ||
Ballot& bl = _pending_meta_queue[log_index - _pending_index]; | ||
pos_hint = bl.grant(peer, pos_hint); | ||
if (bl.granted()) { | ||
|
@@ -82,15 +83,17 @@ int BallotBox::commit_at( | |
// peers, the quorum would decrease by 1, e.g. 3 of 4 changes to 2 of 3. In | ||
// this case, the log after removal may be committed before some previous | ||
// logs, since we use the new configuration to deal the quorum of the | ||
// removal request, we think it's safe to commit all the uncommitted | ||
// removal request, we think it's safe to commit all the uncommitted | ||
// previous logs, which is not well proved right now | ||
// TODO: add vlog when committing previous logs | ||
for (int64_t index = _pending_index; index <= last_committed_index; ++index) { | ||
for (int64_t index = _pending_index; index <= last_committed_index; | ||
++index) { | ||
_pending_meta_queue.pop_front(); | ||
} | ||
|
||
_pending_index = last_committed_index + 1; | ||
_last_committed_index.store(last_committed_index, butil::memory_order_relaxed); | ||
_last_committed_index.store(last_committed_index, | ||
butil::memory_order_relaxed); | ||
lck.unlock(); | ||
// The order doesn't matter | ||
_waiter->on_committed(last_committed_index); | ||
|
@@ -111,16 +114,17 @@ int BallotBox::clear_pending_tasks() { | |
int BallotBox::reset_pending_index(int64_t new_pending_index) { | ||
BAIDU_SCOPED_LOCK(_mutex); | ||
CHECK(_pending_index == 0 && _pending_meta_queue.empty()) | ||
<< "pending_index " << _pending_index << " pending_meta_queue " | ||
<< "pending_index " << _pending_index << " pending_meta_queue " | ||
<< _pending_meta_queue.size(); | ||
CHECK_GT(new_pending_index, _last_committed_index.load( | ||
butil::memory_order_relaxed)); | ||
CHECK_GT(new_pending_index, | ||
_last_committed_index.load(butil::memory_order_relaxed)); | ||
_pending_index = new_pending_index; | ||
_closure_queue->reset_first_index(new_pending_index); | ||
return 0; | ||
} | ||
|
||
int BallotBox::append_pending_task(const Configuration& conf, const Configuration* old_conf, | ||
int BallotBox::append_pending_task(const Configuration& conf, | ||
const Configuration* old_conf, | ||
Closure* closure) { | ||
Ballot bl; | ||
bl.init(conf, | ||
|
@@ -142,12 +146,14 @@ int BallotBox::set_last_committed_index(int64_t last_committed_index) { | |
<< ", parameter last_committed_index=" << last_committed_index; | ||
return -1; | ||
} | ||
if (last_committed_index < | ||
_last_committed_index.load(butil::memory_order_relaxed)) { | ||
if (last_committed_index < | ||
_last_committed_index.load(butil::memory_order_relaxed)) { | ||
return EINVAL; | ||
} | ||
if (last_committed_index > _last_committed_index.load(butil::memory_order_relaxed)) { | ||
_last_committed_index.store(last_committed_index, butil::memory_order_relaxed); | ||
if (last_committed_index > | ||
_last_committed_index.load(butil::memory_order_relaxed)) { | ||
_last_committed_index.store(last_committed_index, | ||
butil::memory_order_relaxed); | ||
lck.unlock(); | ||
_waiter->on_committed(last_committed_index); | ||
} | ||
|
@@ -164,7 +170,7 @@ void BallotBox::describe(std::ostream& os, bool use_html) { | |
pending_queue_size = _pending_meta_queue.size(); | ||
} | ||
lck.unlock(); | ||
const char *newline = use_html ? "<br>" : "\r\n"; | ||
const char* newline = use_html ? "<br>" : "\r\n"; | ||
os << "last_committed_index: " << committed_index << newline; | ||
if (pending_queue_size != 0) { | ||
os << "pending_index: " << pending_index << newline; | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,11 @@ | ||
// Copyright (c) 2015 Baidu.com, Inc. All Rights Reserved | ||
// | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
|
@@ -14,42 +14,40 @@ | |
|
||
// Authors: Zhangyi Chen([email protected]) | ||
|
||
#ifndef BRAFT_BALLOT_BOX_H | ||
#define BRAFT_BALLOT_BOX_H | ||
#ifndef BRAFT_BALLOT_BOX_H | ||
#define BRAFT_BALLOT_BOX_H | ||
|
||
#include <butil/atomicops.h> // butil::atomic | ||
#include <stdint.h> // int64_t | ||
|
||
#include <stdint.h> // int64_t | ||
#include <set> // std::set | ||
#include <deque> | ||
#include <butil/atomicops.h> // butil::atomic | ||
#include <set> // std::set | ||
|
||
#include "braft/ballot.h" | ||
#include "braft/raft.h" | ||
#include "braft/util.h" | ||
#include "braft/ballot.h" | ||
|
||
namespace braft { | ||
|
||
class FSMCaller; | ||
class ClosureQueue; | ||
|
||
struct BallotBoxOptions { | ||
BallotBoxOptions() | ||
: waiter(NULL) | ||
, closure_queue(NULL) | ||
{} | ||
BallotBoxOptions() : waiter(NULL), closure_queue(NULL) {} | ||
FSMCaller* waiter; | ||
ClosureQueue* closure_queue; | ||
}; | ||
|
||
struct BallotBoxStatus { | ||
BallotBoxStatus() | ||
: committed_index(0), pending_index(0), pending_queue_size(0) | ||
{} | ||
: committed_index(0), pending_index(0), pending_queue_size(0) {} | ||
int64_t committed_index; | ||
int64_t pending_index; | ||
int64_t pending_queue_size; | ||
}; | ||
|
||
class BallotBox { | ||
public: | ||
public: | ||
BallotBox(); | ||
~BallotBox(); | ||
|
||
|
@@ -61,46 +59,44 @@ class BallotBox { | |
const PeerId& peer); | ||
|
||
// Called when the leader steps down, otherwise the behavior is undefined | ||
// When a leader steps down, the uncommitted user applications should | ||
// When a leader steps down, the uncommitted user applications should | ||
// fail immediately, which the new leader will deal whether to commit or | ||
// truncate. | ||
int clear_pending_tasks(); | ||
|
||
// Called when a candidate becomes the new leader, otherwise the behavior is | ||
// undefined. | ||
// According to the raft algorithm, the logs from pervious terms can't be | ||
// committed until a log at the new term becomes committed, so | ||
// According to the raft algorithm, the logs from pervious terms can't be | ||
// committed until a log at the new term becomes committed, so | ||
// |new_pending_index| should be |last_log_index| + 1. | ||
int reset_pending_index(int64_t new_pending_index); | ||
|
||
// Called by leader, otherwise the behavior is undefined | ||
// Store application context before replication. | ||
int append_pending_task(const Configuration& conf, | ||
const Configuration* old_conf, | ||
Closure* closure); | ||
int append_pending_task(const Configuration& conf, | ||
const Configuration* old_conf, Closure* closure); | ||
|
||
// Called by follower, otherwise the behavior is undefined. | ||
// Set committed index received from leader | ||
int set_last_committed_index(int64_t last_committed_index); | ||
|
||
int64_t last_committed_index() | ||
{ return _last_committed_index.load(butil::memory_order_acquire); } | ||
int64_t last_committed_index() { | ||
return _last_committed_index.load(butil::memory_order_acquire); | ||
} | ||
|
||
void describe(std::ostream& os, bool use_html); | ||
|
||
void get_status(BallotBoxStatus* ballot_box_status); | ||
|
||
private: | ||
|
||
FSMCaller* _waiter; | ||
ClosureQueue* _closure_queue; | ||
raft_mutex_t _mutex; | ||
butil::atomic<int64_t> _last_committed_index; | ||
int64_t _pending_index; | ||
std::deque<Ballot> _pending_meta_queue; | ||
|
||
private: | ||
FSMCaller* _waiter; | ||
ClosureQueue* _closure_queue; | ||
raft_mutex_t _mutex; | ||
butil::atomic<int64_t> _last_committed_index; | ||
int64_t _pending_index; | ||
std::deque<Ballot> _pending_meta_queue; | ||
}; | ||
|
||
} // namespace braft | ||
|
||
#endif //BRAFT_BALLOT_BOX_H | ||
#endif // BRAFT_BALLOT_BOX_H |
Oops, something went wrong.