Skip to content

Commit

Permalink
chore: Finish with GCS::ListBuckets
Browse files Browse the repository at this point in the history
  • Loading branch information
romange committed Jul 11, 2024
1 parent 5c80d30 commit bc8f06b
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 41 deletions.
11 changes: 6 additions & 5 deletions examples/gcs_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ void Run(SSL_CTX* ctx) {
cloud::GCS gcs(&provider, ctx, pb);
ec = gcs.Connect(connect_ms);
CHECK(!ec) << "Could not connect " << ec;
auto res = gcs.ListBuckets();
CHECK(res) << res.error().message();
for (auto v : *res) {
CONSOLE_INFO << v;
}
auto cb = [](std::string_view bname) {
CONSOLE_INFO << bname;
};

ec = gcs.ListBuckets(cb);
CHECK(!ec) << ec.message();
}

int main(int argc, char** argv) {
Expand Down
115 changes: 81 additions & 34 deletions util/cloud/gcp/gcs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ auto Unexpected(std::errc code) {
return nonstd::make_unexpected(make_error_code(code));
}

#define RETURN_UNEXPECTED(x) do { \
auto ec = (x); \
if (ec) \
return nonstd::make_unexpected(ec); \
} while(false)

#define RETURN_ERROR(x) do { \
auto ec = (x); \
if (ec) \
return ec; \
} while (false)

string AuthHeader(string_view access_token) {
return absl::StrCat("Bearer ", access_token);
}
Expand Down Expand Up @@ -162,35 +174,34 @@ io::Result<TokenTtl> ParseTokenResponse(std::string&& response) {
return result;
}

template <typename RespBody>
error_code SendWithToken(GCPCredsProvider* provider, http::Client* client, EmptyRequest* req, h2::response<RespBody>* resp) {
using EmptyParserPtr = std::unique_ptr<h2::response_parser<h2::empty_body>>;
io::Result<EmptyParserPtr> SendWithToken(GCPCredsProvider* provider, http::Client* client,
EmptyRequest* req) {
error_code ec;
for (unsigned i = 0; i < 2; ++i) { // Iterate for possible token refresh.
VLOG(1) << "HttpReq" << i << ": " << *req << ", socket " << client->native_handle();

error_code ec = client->Send(*req, resp);
if (ec) {
return ec;
}
VLOG(1) << "HttpResp" << i << ": " << *resp;
RETURN_UNEXPECTED(client->Send(*req));
EmptyParserPtr parser(new h2::response_parser<h2::empty_body>());
RETURN_UNEXPECTED(client->ReadHeader(parser.get()));

if (resp->result() == h2::status::ok) {
break;
};
VLOG(1) << "RespHeader" << i << ": " << parser.get();

if (IsUnauthorized(*resp)) {
ec = provider->RefreshToken(client->proactor());
if (ec) {
return ec;
}
if (parser->get().result() == h2::status::ok) {
return parser;
};

*resp = {};
if (IsUnauthorized(parser->get())) {
RETURN_UNEXPECTED(provider->RefreshToken(client->proactor()));
req->set(h2::field::authorization, AuthHeader(provider->access_token()));

continue;
}
LOG(FATAL) << "Unexpected response " << *resp;
ec = make_error_code(errc::bad_message);
LOG(DFATAL) << "Unexpected response " << parser.get();
}
return {};

return nonstd::make_unexpected(ec);
}

} // namespace
Expand Down Expand Up @@ -218,18 +229,16 @@ error_code GCPCredsProvider::Init(unsigned connect_ms, fb2::ProactorBase* pb) {
use_instance_metadata_ = true;
LOG(FATAL) << "TBD: do not support reading from instance metadata";
} else {
error_code ec = LoadGCPConfig(&account_id_, &project_id_);
if (ec)
return ec;
RETURN_ERROR(LoadGCPConfig(&account_id_, &project_id_));

if (account_id_.empty() || project_id_.empty()) {
LOG(WARNING) << "gcloud config file is not valid";
return make_error_code(errc::not_supported);
}
string adc_file = absl::StrCat(*root_path, "/legacy_credentials/", account_id_, "/adc.json");
VLOG(1) << "ADC file: " << adc_file;
ec = ParseADC(adc_file, &client_id_, &client_secret_, &refresh_token_);
if (ec)
return ec;
RETURN_ERROR(ParseADC(adc_file, &client_id_, &client_secret_, &refresh_token_));

if (client_id_.empty() || client_secret_.empty() || refresh_token_.empty()) {
LOG(WARNING) << "Bad ADC file " << adc_file;
return make_error_code(errc::bad_message);
Expand Down Expand Up @@ -264,9 +273,8 @@ error_code GCPCredsProvider::RefreshToken(fb2::ProactorBase* pb) {
VLOG(1) << "Req: " << req;

h2::response<h2::string_body> resp;
ec = https_client.Send(req, &resp);
if (ec)
return ec;
RETURN_ERROR(https_client.Send(req, &resp));

if (resp.result() != h2::status::ok) {
LOG(WARNING) << "Http error: " << string(resp.reason()) << ", Body: ", resp.body();
return make_error_code(errc::permission_denied);
Expand Down Expand Up @@ -297,18 +305,57 @@ std::error_code GCS::Connect(unsigned msec) {
return client_->Connect(kDomain, "443", ssl_ctx_);
}

auto GCS::ListBuckets() -> ListBucketResult {
error_code GCS::ListBuckets(ListBucketCb cb) {
string url = absl::StrCat("/storage/v1/b?project=", creds_provider_.project_id());
absl::StrAppend(&url, "&fields=items,nextPageToken");
absl::StrAppend(&url, "&maxResults=50&fields=items,nextPageToken");

auto http_req = PrepareRequest(h2::verb::get, url, creds_provider_.access_token());

rj::Document doc;
h2::response<h2::string_body> resp_msg;
error_code ec = SendWithToken(&creds_provider_, client_.get(), &http_req, &resp_msg);
if (ec)
return nonstd::make_unexpected(ec);
VLOG(2) << "ListResponse: " << resp_msg.body();

while (true) {
io::Result<EmptyParserPtr> parse_res =
SendWithToken(&creds_provider_, client_.get(), &http_req);
if (!parse_res)
return parse_res.error();
EmptyParserPtr empty_parser = std::move(*parse_res);
h2::response_parser<h2::string_body> resp(std::move(*empty_parser));
RETURN_ERROR(client_->Recv(&resp));

auto msg = resp.release();

VLOG(2) << "ListResponse: " << msg.body();

doc.ParseInsitu(&msg.body().front());
if (doc.HasParseError()) {
return make_error_code(errc::bad_message);
}

auto it = doc.FindMember("items");
if (it == doc.MemberEnd())
break;

const auto& val = it->value;
if (!val.IsArray()) {
return make_error_code(errc::bad_message);
}
auto array = val.GetArray();

for (size_t i = 0; i < array.Size(); ++i) {
const auto& item = array[i];
auto it = item.FindMember("name");
if (it != item.MemberEnd()) {
cb(string_view{it->value.GetString(), it->value.GetStringLength()});
}
}

it = doc.FindMember("nextPageToken");
if (it == doc.MemberEnd()) {
break;
}
absl::string_view page_token{it->value.GetString(), it->value.GetStringLength()};
http_req.target(absl::StrCat(url, "&pageToken=", page_token));
}
return {};
}

Expand Down
5 changes: 3 additions & 2 deletions util/cloud/gcp/gcs.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,15 @@ class GCPCredsProvider {

class GCS {
public:
using ListBucketResult = io::Result<std::vector<std::string>>;
using BucketItem = std::string_view;
using ListBucketCb = std::function<void(BucketItem)>;

GCS(GCPCredsProvider* creds_provider, SSL_CTX* ssl_cntx, fb2::ProactorBase* pb);
~GCS();

std::error_code Connect(unsigned msec);

ListBucketResult ListBuckets();
std::error_code ListBuckets(ListBucketCb cb);

private:
GCPCredsProvider& creds_provider_;
Expand Down

0 comments on commit bc8f06b

Please sign in to comment.