Skip to content

Commit

Permalink
issue=#1202 ts unload all tablets on exit (#1203)
Browse files Browse the repository at this point in the history
  • Loading branch information
00k committed Apr 16, 2017
1 parent fde4243 commit 311d81b
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 44 deletions.
11 changes: 10 additions & 1 deletion src/io/tablet_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,19 @@ extern tera::Counter row_read_delay;
namespace tera {
namespace io {

TabletIO::TabletIO(const std::string& key_start, const std::string& key_end)
std::ostream& operator << (std::ostream& o, const TabletIO& tablet_io) {
o << tablet_io.short_path_
<< " [" << DebugString(tablet_io.start_key_)
<< ", " << DebugString(tablet_io.end_key_) << "]";
return o;
}

TabletIO::TabletIO(const std::string& key_start, const std::string& key_end,
const std::string& path)
: async_writer_(NULL),
start_key_(key_start),
end_key_(key_end),
short_path_(path),
compact_status_(kTableNotCompact),
status_(kNotInit),
ref_count_(1), db_ref_count_(0), db_(NULL),
Expand Down
6 changes: 5 additions & 1 deletion src/io/tablet_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ class TabletIO {
typedef boost::function<void (std::vector<const RowMutationSequence*>*,
std::vector<StatusCode>*)> WriteCallback;

friend std::ostream& operator << (std::ostream& o, const TabletIO& tablet_io);

public:
TabletIO(const std::string& key_start, const std::string& key_end);
TabletIO(const std::string& key_start, const std::string& key_end,
const std::string& path);
virtual ~TabletIO();

// for testing
Expand Down Expand Up @@ -247,6 +250,7 @@ class TabletIO {
std::string tablet_path_;
const std::string start_key_;
const std::string end_key_;
const std::string short_path_;
std::string raw_start_key_;
std::string raw_end_key_;
CompactStatus compact_status_;
Expand Down
18 changes: 9 additions & 9 deletions src/io/test/load_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ TEST_F(TabletIOTest, General) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();
env->SetPrefix(mock_env_prefix);
tablet.SetMockEnv(env);
Expand Down Expand Up @@ -136,7 +136,7 @@ TEST_F(TabletIOTest, CurrentLost) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();
env->SetPrefix(mock_env_prefix);
env->SetGetChildrenCallback(DropCurrent);
Expand Down Expand Up @@ -167,7 +167,7 @@ TEST_F(TabletIOTest, CurrentReadFailed) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();
env->SetPrefix(mock_env_prefix);
env->SetNewSequentialFileFailedCallback(CannotReadCurrent);
Expand Down Expand Up @@ -204,7 +204,7 @@ TEST_F(TabletIOTest, CurrentCorrupted) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();
env->SetPrefix(mock_env_prefix);

Expand Down Expand Up @@ -242,7 +242,7 @@ TEST_F(TabletIOTest, ManifestLost) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();
env->SetPrefix(mock_env_prefix);

Expand Down Expand Up @@ -273,7 +273,7 @@ TEST_F(TabletIOTest, ManifestReadFailed) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();
env->SetPrefix(mock_env_prefix);
env->SetNewSequentialFileFailedCallback(CannotReadManifest);
Expand Down Expand Up @@ -310,7 +310,7 @@ TEST_F(TabletIOTest, ManifestCorrupted) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();
env->SetPrefix(mock_env_prefix);

Expand Down Expand Up @@ -341,7 +341,7 @@ TEST_F(TabletIOTest, SstLost) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();
env->SetPrefix(mock_env_prefix);

Expand All @@ -364,7 +364,7 @@ TEST_F(TabletIOTest, SstLostButIgnore) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
leveldb::MockEnv* env = (leveldb::MockEnv*)LeveldbMockEnv();

std::string fname = mock_env_prefix + tablet_path + "/0/__oops";
Expand Down
30 changes: 15 additions & 15 deletions src/io/test/tablet_io_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ TEST_F(TabletIOTest, General) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand All @@ -115,7 +115,7 @@ TEST_F(TabletIOTest, Split) {
StatusCode status;
uint64_t size = 0;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand All @@ -136,7 +136,7 @@ TEST_F(TabletIOTest, Split) {
// open tablet for other key scope
key_start = "5000";
key_end = "8000";
TabletIO other_tablet(key_start, key_end);
TabletIO other_tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(other_tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));
other_tablet.GetDataSize(&size, NULL, &status);
Expand All @@ -150,7 +150,7 @@ TEST_F(TabletIOTest, Split) {

key_start = "";
key_end = "5000";
TabletIO l_tablet(key_start, key_end);
TabletIO l_tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(l_tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));
l_tablet.GetDataSize(&size, NULL, &status);
Expand All @@ -160,7 +160,7 @@ TEST_F(TabletIOTest, Split) {

key_start = "8000";
key_end = "";
TabletIO r_tablet(key_start, key_end);
TabletIO r_tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(r_tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));
r_tablet.GetDataSize(&size, NULL, &status);
Expand All @@ -177,7 +177,7 @@ TEST_F(TabletIOTest, SplitAndCheckSize) {
StatusCode status;
uint64_t size = 0;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand All @@ -197,15 +197,15 @@ TEST_F(TabletIOTest, SplitAndCheckSize) {
EXPECT_TRUE(tablet.Unload());

// open from split key to check scope size
TabletIO l_tablet(key_start, split_key);
TabletIO l_tablet(key_start, split_key, tablet_path);
EXPECT_TRUE(l_tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));
l_tablet.GetDataSize(&size, NULL, &status);
LOG(INFO) << "table[" << key_start << ", " << split_key
<< "]: size = " << size;
EXPECT_TRUE(l_tablet.Unload());

TabletIO r_tablet(split_key, key_end);
TabletIO r_tablet(split_key, key_end, tablet_path);
EXPECT_TRUE(r_tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));
r_tablet.GetDataSize(&size, NULL, &status);
Expand All @@ -222,7 +222,7 @@ TEST_F(TabletIOTest, OverWrite) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand All @@ -248,7 +248,7 @@ TEST_F(TabletIOTest, Compact) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand All @@ -264,7 +264,7 @@ TEST_F(TabletIOTest, Compact) {
// open another scope
std::string new_key_start = StringFormat("%011llu", 5); // NumberToString(500);
std::string new_key_end = StringFormat("%011llu", 50); // NumberToString(800);
TabletIO new_tablet(new_key_start, new_key_end);
TabletIO new_tablet(new_key_start, new_key_end, tablet_path);
EXPECT_TRUE(new_tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));
EXPECT_TRUE(new_tablet.Compact(0, &status));
Expand Down Expand Up @@ -294,7 +294,7 @@ TEST_F(TabletIOTest, LowLevelScan) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(GetTableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand Down Expand Up @@ -377,7 +377,7 @@ TEST_F(TabletIOTest, SplitToSubTable) {
StatusCode status;
uint64_t size = 0;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(TableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand Down Expand Up @@ -413,7 +413,7 @@ TEST_F(TabletIOTest, SplitToSubTable) {
parent_tablet.push_back(1);

// 1. load sub-table 1
TabletIO l_tablet(key_start, split_key);
TabletIO l_tablet(key_start, split_key, split_path_1);
EXPECT_TRUE(l_tablet.Load(TableSchema(), split_path_1, parent_tablet,
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));
l_tablet.GetDataSize(&size, NULL, &status);
Expand All @@ -431,7 +431,7 @@ TEST_F(TabletIOTest, SplitToSubTable) {
EXPECT_TRUE(l_tablet.Unload());

// 2. load sub-table 2
TabletIO r_tablet(split_key, key_end);
TabletIO r_tablet(split_key, key_end, split_path_2);
EXPECT_TRUE(r_tablet.Load(TableSchema(), split_path_2, parent_tablet,
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));
r_tablet.GetDataSize(&size, NULL, &status);
Expand Down
4 changes: 2 additions & 2 deletions src/io/test/tablet_scanner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ TEST_F(TabletScannerTest, General) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(GetTableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand All @@ -240,7 +240,7 @@ TEST_F(TabletScannerTest, CacheEvict) {
std::string key_end = "";
StatusCode status;

TabletIO tablet(key_start, key_end);
TabletIO tablet(key_start, key_end, tablet_path);
EXPECT_TRUE(tablet.Load(GetTableSchema(), tablet_path, std::vector<uint64_t>(),
empty_snaphsots_, empty_rollback_, NULL, NULL, NULL, &status));

Expand Down
2 changes: 1 addition & 1 deletion src/tabletnode/tablet_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ bool TabletManager::AddTablet(const std::string& table_name,
SetStatusCode(kTableExist, status);
return false;
}
*tablet_io = tablet_list_[tablet_range] = new io::TabletIO(key_start, key_end);
*tablet_io = tablet_list_[tablet_range] = new io::TabletIO(key_start, key_end, table_path);
(*tablet_io)->AddRef();
return true;
}
Expand Down
4 changes: 1 addition & 3 deletions src/tabletnode/tabletnode_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,9 @@ bool TabletNodeEntry::StartServer() {
}

void TabletNodeEntry::ShutdownServer() {
tabletnode_impl_->Exit();
LOG(INFO) << "shut down server";
// StopServer要保证调用后, 不会再调用serveice的任何方法.
rpc_server_->Stop();
tabletnode_impl_->Exit();
tabletnode_impl_.reset();
LOG(INFO) << "TabletNodeEntry stop done!";
}

Expand Down
51 changes: 39 additions & 12 deletions src/tabletnode/tabletnode_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "db/filename.h"
#include "db/table_cache.h"
#include "common/thread.h"
#include "io/io_utils.h"
#include "io/utils_leveldb.h"
#include "leveldb/cache.h"
Expand Down Expand Up @@ -198,23 +199,49 @@ void TabletNodeImpl::InitCacheSystem() {
}

bool TabletNodeImpl::Exit() {
thread_pool_.reset();
std::vector<io::TabletIO*> tablet_ios;
tablet_manager_->GetAllTablets(&tablet_ios);

std::vector<TabletMeta*> tablet_meta_list;
tablet_manager_->GetAllTabletMeta(&tablet_meta_list);
std::vector<TabletMeta*>::iterator it = tablet_meta_list.begin();
for (; it != tablet_meta_list.end(); ++it) {
TabletMeta*& tablet_meta = *it;
StatusCode status = kTabletNodeOk;
bool ret = UnloadTablet(tablet_meta->table_name(),
tablet_meta->key_range().key_start(),
tablet_meta->key_range().key_end(), &status);
LOG(INFO) << "unload tablet [" << tablet_meta->path() << "] return " << ret;
delete tablet_meta;
std::vector<common::Thread> unload_threads;
unload_threads.resize(tablet_ios.size());

Counter worker_count;
worker_count.Set(tablet_ios.size());

for (uint32_t i = 0; i < tablet_ios.size(); ++i) {
io::TabletIO* tablet_io = tablet_ios[i];
common::Thread& thread = unload_threads[i];
thread.Start(boost::bind(&TabletNodeImpl::UnloadTabletProc,
this, tablet_io, &worker_count));
}
int64_t print_ms_ = get_millis();
int64_t left = 0;
while ((left = worker_count.Get()) > 0) {
if (get_millis() - print_ms_ > 1000) {
LOG(INFO) << "[Exit] " << left << " tablets are still unloading ...";
print_ms_ = get_millis();
}
ThisThread::Sleep(100);
}
for (uint32_t i = 0; i < tablet_ios.size(); ++i) {
unload_threads[i].Join();
}
return true;
}

void TabletNodeImpl::UnloadTabletProc(io::TabletIO* tablet_io, Counter* worker_count) {
LOG(INFO) << "begin to unload tablet: " << *tablet_io;
StatusCode status;
if (!tablet_io->Unload(&status)) {
LOG(ERROR) << "fail to unload tablet: " << *tablet_io
<< ", status: " << StatusCodeToString(status);
} else {
LOG(INFO) << "unload tablet success: " << *tablet_io;
}
tablet_io->DecRef();
worker_count->Dec();
}

void TabletNodeImpl::LoadTablet(const LoadTabletRequest* request,
LoadTabletResponse* response,
google::protobuf::Closure* done) {
Expand Down
3 changes: 3 additions & 0 deletions src/tabletnode/tabletnode_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ class TabletNodeImpl {
const std::set<std::string> active_tablets);

bool ApplySchema(const UpdateRequest* request);

void UnloadTabletProc(io::TabletIO* tablet_io, Counter* worker_count);

private:
mutable Mutex status_mutex_;
TabletNodeStatus status_;
Expand Down

0 comments on commit 311d81b

Please sign in to comment.