Skip to content

Commit

Permalink
chore: support instance metadata (#298)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>

chore: refresh auth-token automatically
  • Loading branch information
romange authored Jul 14, 2024
1 parent 2290be2 commit 9be494f
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 97 deletions.
11 changes: 4 additions & 7 deletions examples/gcs_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,19 @@ void Run(SSL_CTX* ctx) {
CHECK(!ec) << "Could not load credentials " << ec.message();

cloud::GCS gcs(&provider, ctx, pb);
ec = gcs.Connect(connect_ms);
CHECK(!ec) << "Could not connect " << ec;

string prefix = GetFlag(FLAGS_prefix);
if (!prefix.empty()) {
string bucket = GetFlag(FLAGS_bucket);
auto conn_pool = gcs.CreateConnectionPool();
CHECK(!bucket.empty());
string bucket = GetFlag(FLAGS_bucket);

if (!bucket.empty()) {
auto conn_pool = gcs.GetConnectionPool();
if (GetFlag(FLAGS_write) > 0) {
auto src = io::ReadFileToString("/proc/self/exe");
CHECK(src);
for (unsigned i = 0; i < GetFlag(FLAGS_write); ++i) {
string dest_key = absl::StrCat(prefix, "_", i);
io::Result<io::WriteFile*> dest_res =
cloud::OpenWriteGcsFile(bucket, dest_key, &provider, conn_pool.get());
cloud::OpenWriteGcsFile(bucket, dest_key, &provider, conn_pool);
CHECK(dest_res) << "Could not open " << dest_key << " " << dest_res.error().message();
unique_ptr<io::WriteFile> dest(*dest_res);
error_code ec = dest->Write(*src);
Expand Down
31 changes: 20 additions & 11 deletions util/cloud/gcp/gcp_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
#include <boost/beast/http/string_body.hpp>

#include "base/logging.h"

#include "util/cloud/gcp/gcp_creds_provider.h"
#include "util/http/http_client.h"

namespace util::cloud {
using namespace std;
Expand Down Expand Up @@ -66,20 +68,26 @@ std::error_code DynamicBodyRequestImpl::Send(http::Client* client) {

} // namespace detail

RobustSender::RobustSender(unsigned num_iterations, GCPCredsProvider* provider)
: num_iterations_(num_iterations), provider_(provider) {
RobustSender::RobustSender(http::ClientPool* pool, GCPCredsProvider* provider)
: pool_(pool), provider_(provider) {
}

auto RobustSender::Send(http::Client* client,
auto RobustSender::Send(unsigned num_iterations,
detail::HttpRequestBase* req) -> io::Result<HeaderParserPtr> {
error_code ec;
for (unsigned i = 0; i < num_iterations_; ++i) { // Iterate for possible token refresh.
VLOG(1) << "HttpReq " << client->host() << ": " << req->GetHeaders() << ", ["
<< client->native_handle() << "]";
for (unsigned i = 0; i < num_iterations; ++i) { // Iterate for possible token refresh.
auto res = pool_->GetHandle();
if (!res)
return nonstd::make_unexpected(res.error());

auto client_handle = std::move(res.value());

VLOG(1) << "HttpReq " << client_handle->host() << ": " << req->GetHeaders() << ", ["
<< client_handle->native_handle() << "]";

RETURN_UNEXPECTED(req->Send(client));
RETURN_UNEXPECTED(req->Send(client_handle.get()));
HeaderParserPtr parser(new h2::response_parser<h2::empty_body>());
RETURN_UNEXPECTED(client->ReadHeader(parser.get()));
RETURN_UNEXPECTED(client_handle->ReadHeader(parser.get()));
{
const auto& msg = parser->get();
VLOG(1) << "RespHeader" << i << ": " << msg;
Expand All @@ -95,18 +103,19 @@ auto RobustSender::Send(http::Client* client,

// We have some kind of error, possibly with body that needs to be drained.
h2::response_parser<h2::string_body> drainer(std::move(*parser));
RETURN_UNEXPECTED(client->Recv(&drainer));
RETURN_UNEXPECTED(client_handle->Recv(&drainer));
const auto& msg = drainer.get();

if (DoesServerPushback(msg.result())) {
LOG(INFO) << "Retrying(" << client->native_handle() << ") with " << msg;
LOG(INFO) << "Retrying(" << client_handle->native_handle() << ") with " << msg;

ThisFiber::SleepFor(100ms);
continue;
}

if (IsUnauthorized(msg)) {
RETURN_UNEXPECTED(provider_->RefreshToken(client->proactor()));
VLOG(1) << "Refreshing token";
RETURN_UNEXPECTED(provider_->RefreshToken(client_handle->proactor()));
req->SetHeader(h2::field::authorization, AuthHeader(provider_->access_token()));

continue;
Expand Down
10 changes: 6 additions & 4 deletions util/cloud/gcp/gcp_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
#pragma once

#include <boost/beast/http/empty_body.hpp>
#include <boost/beast/http/dynamic_body.hpp>
#include <boost/beast/http/parser.hpp>
#include <memory>

#include "io/io.h"
#include "util/http/http_client.h"
#include "util/http/https_client_pool.h"

namespace util::cloud {
class GCPCredsProvider;
Expand Down Expand Up @@ -99,12 +101,12 @@ class RobustSender {
using HeaderParserPtr =
std::unique_ptr<boost::beast::http::response_parser<boost::beast::http::empty_body>>;

RobustSender(unsigned num_iterations, GCPCredsProvider* provider);
RobustSender(http::ClientPool* pool, GCPCredsProvider* provider);

io::Result<HeaderParserPtr> Send(http::Client* client, detail::HttpRequestBase* req);
io::Result<HeaderParserPtr> Send(unsigned num_iterations, detail::HttpRequestBase* req);

private:
unsigned num_iterations_;
http::ClientPool* pool_;
GCPCredsProvider* provider_;
};

Expand Down
Loading

0 comments on commit 9be494f

Please sign in to comment.