Skip to content

Commit

Permalink
[yugabyte#6844] Expose BackfillJobsPB through master api
Browse files Browse the repository at this point in the history
Summary:
It is useful for backfill tests to know various details such as
whether or not the backfill timestamp has been chosen.

This can reduce flackieness in the tests, as otherwise tests
are having to guess based on sleeps added intermittently.

Depends on D11348
Depends on D11303

Test Plan:
./yb_build.sh --cxx-test integration-tests_cassandra_cpp_driver-test --gtest_filter 'CppCassandraDriverTest*DeleteIndexWhileBackfilling'

./yb_build.sh --cxx-test pg_index_backfill-test

Reviewers: jason

Reviewed By: jason

Subscribers: ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D11404
  • Loading branch information
amitanandaiyer committed May 8, 2021
1 parent 557ed82 commit e2fba66
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 21 deletions.
77 changes: 77 additions & 0 deletions src/yb/integration-tests/backfill-test-util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#ifndef YB_INTEGRATION_TESTS_BACKFILL_TEST_UTIL_H
#define YB_INTEGRATION_TESTS_BACKFILL_TEST_UTIL_H

#include <algorithm>
#include <string>

#include "yb/integration-tests/external_mini_cluster.h"
#include "yb/master/master.pb.h"
#include "yb/master/master.proxy.h"
#include "yb/master/master_service.h"
#include "yb/util/test_util.h"
#include "yb/client/yb_table_name.h"

namespace yb {

Result<master::BackfillJobPB> GetBackfillJobs(
std::shared_ptr<master::MasterServiceProxy> proxy,
const client::YBTableName& table_name,
const TableId& table_id = "") {
master::GetBackfillJobsRequestPB req;
master::GetBackfillJobsResponsePB resp;
rpc::RpcController rpc;
constexpr auto kAdminRpcTimeout = 5;
rpc.set_timeout(MonoDelta::FromSeconds(kAdminRpcTimeout));

if (!table_id.empty()) {
req.mutable_table_identifier()->set_table_id(table_id);
} else {
table_name.SetIntoTableIdentifierPB(req.mutable_table_identifier());
}
RETURN_NOT_OK(proxy->GetBackfillJobs(req, &resp, &rpc));
if (resp.backfill_jobs_size() == 0) {
return STATUS(NotFound, "No backfill job running yet");
} else {
CHECK_EQ(resp.backfill_jobs_size(), 1) << "As of now only one outstanding backfill "
<< "job should be pending.";
return resp.backfill_jobs(0);
}
}

CHECKED_STATUS WaitForBackfillSafeTimeOn(
std::shared_ptr<master::MasterServiceProxy> proxy,
const client::YBTableName& table_name,
const TableId& table_id = "",
MonoDelta max_wait = MonoDelta::FromSeconds(60)) {
return WaitFor(
[proxy, &table_name, &table_id]() {
Result<master::BackfillJobPB> backfill_job = GetBackfillJobs(proxy, table_name, table_id);
return backfill_job && backfill_job->has_backfilling_timestamp();
},
max_wait, "waiting for backfill to get past GetSafeTime.");
}

CHECKED_STATUS WaitForBackfillSafeTimeOn(
ExternalMiniCluster* cluster,
const client::YBTableName& table_name,
const TableId& table_id = "",
MonoDelta max_wait = MonoDelta::FromSeconds(60)) {
return WaitForBackfillSafeTimeOn(cluster->master_proxy(), table_name, table_id, max_wait);
}

} // namespace yb

#endif // YB_INTEGRATION_TESTS_BACKFILL_TEST_UTIL_H
3 changes: 2 additions & 1 deletion src/yb/integration-tests/cassandra_cpp_driver-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "yb/server/hybrid_clock.h"
#include "yb/server/clock.h"

#include "yb/integration-tests/backfill-test-util.h"
#include "yb/integration-tests/external_mini_cluster-itest-base.h"
#include "yb/integration-tests/cql_test_util.h"

Expand Down Expand Up @@ -1601,7 +1602,7 @@ TEST_F_EX(
auto res = client_->WaitUntilIndexPermissionsAtLeast(
table_name, index_table_name, IndexPermissions::INDEX_PERM_DO_BACKFILL, 50ms /* max_wait */);
// Allow backfill to get past GetSafeTime
SleepFor(MonoDelta::FromMilliseconds(50));
ASSERT_OK(WaitForBackfillSafeTimeOn(cluster_.get(), table_name));

ASSERT_OK(session_.ExecuteQuery("drop index test_table_index_by_v"));

Expand Down
6 changes: 6 additions & 0 deletions src/yb/integration-tests/external_mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,12 @@ std::shared_ptr<ConsensusServiceProxy> ExternalMiniCluster::GetLeaderConsensusPr
return std::make_shared<ConsensusServiceProxy>(proxy_cache_.get(), leader_master_sock);
}

std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::GetLeaderMasterProxy() {
auto leader_master_sock = GetLeaderMaster()->bound_rpc_addr();

return std::make_shared<MasterServiceProxy>(proxy_cache_.get(), leader_master_sock);
}

std::shared_ptr<ConsensusServiceProxy> ExternalMiniCluster::GetConsensusProxy(
scoped_refptr<ExternalMaster> master) {
auto master_sock = master->bound_rpc_addr();
Expand Down
3 changes: 3 additions & 0 deletions src/yb/integration-tests/external_mini_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ class ExternalMiniCluster : public MiniClusterBase {
// Get the master leader consensus proxy.
std::shared_ptr<consensus::ConsensusServiceProxy> GetLeaderConsensusProxy();

// Get the master leader master service proxy.
std::shared_ptr<master::MasterServiceProxy> GetLeaderMasterProxy();

// Get the given master's consensus proxy.
std::shared_ptr<consensus::ConsensusServiceProxy> GetConsensusProxy(
scoped_refptr<ExternalMaster> master);
Expand Down
19 changes: 19 additions & 0 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3774,6 +3774,25 @@ Status CatalogManager::BackfillIndex(
this, indexed_table, {index_info_pb}, boost::none);
}

Status CatalogManager::GetBackfillJobs(
const GetBackfillJobsRequestPB* req,
GetBackfillJobsResponsePB* resp,
rpc::RpcContext* rpc) {
TableIdentifierPB table_id = req->table_identifier();

scoped_refptr<TableInfo> indexed_table = VERIFY_RESULT(FindTable(table_id));
if (indexed_table == nullptr) {
Status s = STATUS(NotFound, "Requested table $0 does not exist", table_id.ShortDebugString());
return SetupError(resp->mutable_error(), MasterErrorPB::OBJECT_NOT_FOUND, s);
}

{
auto l = indexed_table->LockForRead();
resp->mutable_backfill_jobs()->CopyFrom(l->pb.backfill_jobs());
}
return Status::OK();
}

Status CatalogManager::LaunchBackfillIndexForTable(
const LaunchBackfillIndexForTableRequestPB* req,
LaunchBackfillIndexForTableResponsePB* resp,
Expand Down
19 changes: 11 additions & 8 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,17 +241,20 @@ class CatalogManager : public tserver::TabletPeerLookupIf {

// Backfill the specified index. Currently only supported for YSQL. YCQL does not need this as
// master automatically runs backfill according to the DocDB permissions.
CHECKED_STATUS BackfillIndex(
const BackfillIndexRequestPB* req,
BackfillIndexResponsePB* resp,
rpc::RpcContext* rpc);
CHECKED_STATUS BackfillIndex(const BackfillIndexRequestPB* req,
BackfillIndexResponsePB* resp,
rpc::RpcContext* rpc);

// Gets the backfill jobs state associated with the requested table.
CHECKED_STATUS GetBackfillJobs(const GetBackfillJobsRequestPB* req,
GetBackfillJobsResponsePB* resp,
rpc::RpcContext* rpc);

// Backfill the indexes for the specified table.
// Used for backfilling YCQL defered indexes when triggered from yb-admin.
CHECKED_STATUS LaunchBackfillIndexForTable(
const LaunchBackfillIndexForTableRequestPB* req,
LaunchBackfillIndexForTableResponsePB* resp,
rpc::RpcContext* rpc);
CHECKED_STATUS LaunchBackfillIndexForTable(const LaunchBackfillIndexForTableRequestPB* req,
LaunchBackfillIndexForTableResponsePB* resp,
rpc::RpcContext* rpc);

// Delete the specified table.
//
Expand Down
12 changes: 12 additions & 0 deletions src/yb/master/master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1075,6 +1075,17 @@ message LaunchBackfillIndexForTableResponsePB {
optional MasterErrorPB error = 1;
}

message GetBackfillJobsRequestPB {
// The indexed table identifier. Used for YCQL tables from yb-admin.
optional TableIdentifierPB table_identifier = 1;
}

message GetBackfillJobsResponsePB {
repeated BackfillJobPB backfill_jobs = 1;
// The error, if an error occurred with this request.
optional MasterErrorPB error = 2;
}

// Delete table request (including index table).
message DeleteTableRequestPB {
required TableIdentifierPB table = 1;
Expand Down Expand Up @@ -2052,6 +2063,7 @@ service MasterService {
rpc BackfillIndex(BackfillIndexRequestPB) returns (BackfillIndexResponsePB);
rpc LaunchBackfillIndexForTable(LaunchBackfillIndexForTableRequestPB)
returns (LaunchBackfillIndexForTableResponsePB);
rpc GetBackfillJobs(GetBackfillJobsRequestPB) returns (GetBackfillJobsResponsePB);

rpc DeleteTable(DeleteTableRequestPB) returns (DeleteTableResponsePB);
rpc IsDeleteTableDone(IsDeleteTableDoneRequestPB) returns (IsDeleteTableDoneResponsePB);
Expand Down
1 change: 1 addition & 0 deletions src/yb/master/master_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ BOOST_PP_SEQ_FOR_EACH(
(IsTruncateTableDone)
(BackfillIndex)
(LaunchBackfillIndexForTable)
(GetBackfillJobs)
(DeleteTable)
(IsDeleteTableDone)
(AlterTable)
Expand Down
3 changes: 3 additions & 0 deletions src/yb/master/master_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class MasterServiceImpl : public MasterServiceIf,
void BackfillIndex(const BackfillIndexRequestPB* req,
BackfillIndexResponsePB* resp,
rpc::RpcContext rpc) override;
void GetBackfillJobs(const GetBackfillJobsRequestPB* req,
GetBackfillJobsResponsePB* resp,
rpc::RpcContext rpc) override;
void LaunchBackfillIndexForTable(const LaunchBackfillIndexForTableRequestPB* req,
LaunchBackfillIndexForTableResponsePB* resp,
rpc::RpcContext rpc) override;
Expand Down
26 changes: 14 additions & 12 deletions src/yb/yql/pgwrapper/pg_index_backfill-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "yb/client/table.h"
#include "yb/gutil/strings/join.h"
#include "yb/integration-tests/backfill-test-util.h"
#include "yb/util/backoff_waiter.h"
#include "yb/util/monotime.h"
#include "yb/util/stol_utils.h"
Expand All @@ -35,6 +36,7 @@ constexpr auto kColoDbName = "colodb";
constexpr auto kDatabaseName = "yugabyte";
constexpr auto kIndexName = "iii";
constexpr auto kTableName = "ttt";
const client::YBTableName kYBTableName(YQLDatabase::YQL_DATABASE_PGSQL, kDatabaseName, kTableName);

} // namespace

Expand Down Expand Up @@ -766,7 +768,8 @@ class PgIndexBackfillSlow : public PgIndexBackfillTest {
return true;
}

CHECKED_STATUS WaitForBackfillSafeTime(const std::string& index_name) {
CHECKED_STATUS WaitForBackfillSafeTime(
const client::YBTableName& table_name, const std::string& index_name) {
LOG(INFO) << "Waiting for pg_index indislive to be true";
RETURN_NOT_OK(WaitFor(
[this, &index_name] {
Expand All @@ -793,12 +796,11 @@ class PgIndexBackfillSlow : public PgIndexBackfillTest {
LOG(INFO) << "Waiting till (approx) the end of the delay after committing indisready true";
SleepFor(kIndexStateFlagsUpdateDelay);

// Give the backfill stage enough time to get a read time.
// TODO(jason): come up with some way to wait until the read time is chosen rather than relying
// on a brittle sleep (issue #6844).
LOG(INFO) << "Waiting out half the delay of executing backfill so that we're hopefully after "
<< "getting the safe read time and before executing backfill";
SleepFor(kBackfillDelay / 2);
auto client = VERIFY_RESULT(cluster_->CreateClient());
const std::string table_id = VERIFY_RESULT(
GetTableIdByTableName(client.get(), table_name.namespace_name(), table_name.table_name()));
RETURN_NOT_OK(
WaitForBackfillSafeTimeOn(cluster_->GetLeaderMasterProxy(), table_name, table_id));

return Status::OK();
}
Expand Down Expand Up @@ -878,7 +880,7 @@ TEST_F_EX(PgIndexBackfillTest,
});
thread_holder_.AddThreadFunctor([this] {
LOG(INFO) << "Begin write thread";
ASSERT_OK(WaitForBackfillSafeTime(kIndexName));
ASSERT_OK(WaitForBackfillSafeTime(kYBTableName, kIndexName));

LOG(INFO) << "Updating row";
ASSERT_OK(conn_->ExecuteFormat("UPDATE $0 SET j = j + 100 WHERE i = 3", kTableName));
Expand Down Expand Up @@ -1234,7 +1236,7 @@ TEST_F_EX(PgIndexBackfillTest,
});
thread_holder_.AddThreadFunctor([this] {
LOG(INFO) << "Begin write thread";
ASSERT_OK(WaitForBackfillSafeTime(kIndexName));
ASSERT_OK(WaitForBackfillSafeTime(kYBTableName, kIndexName));

LOG(INFO) << "Deleting row";
ASSERT_OK(conn_->ExecuteFormat("DELETE FROM $0 WHERE i = 1", kTableName));
Expand Down Expand Up @@ -1286,7 +1288,7 @@ TEST_F_EX(PgIndexBackfillTest,

thread_holder_.AddThreadFunctor([this, &same_ts_conn, &diff_ts_conn] {
LOG(INFO) << "Begin select thread";
ASSERT_OK(WaitForBackfillSafeTime(kIndexName));
ASSERT_OK(WaitForBackfillSafeTime(kYBTableName, kIndexName));

LOG(INFO) << "Load DocDB table/index schemas to pggate cache for the other connections";
ASSERT_RESULT(same_ts_conn.FetchFormat("SELECT * FROM $0 WHERE i = 2", kTableName));
Expand Down Expand Up @@ -1440,7 +1442,7 @@ TEST_F_EX(PgIndexBackfillTest,
});
thread_holder_.AddThreadFunctor([this] {
LOG(INFO) << "Begin drop thread";
ASSERT_OK(WaitForBackfillSafeTime(kIndexName));
ASSERT_OK(WaitForBackfillSafeTime(kYBTableName, kIndexName));

LOG(INFO) << "Drop index";
ASSERT_OK(conn_->ExecuteFormat("DROP INDEX $0", kIndexName));
Expand Down Expand Up @@ -1486,7 +1488,7 @@ TEST_F_EX(PgIndexBackfillTest,
});
thread_holder_.AddThreadFunctor([this] {
LOG(INFO) << "Begin master leader stepdown thread";
ASSERT_OK(WaitForBackfillSafeTime(kIndexName));
ASSERT_OK(WaitForBackfillSafeTime(kYBTableName, kIndexName));

LOG(INFO) << "Doing master leader stepdown";
tserver::TabletServerErrorPB::Code error_code;
Expand Down

0 comments on commit e2fba66

Please sign in to comment.