Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom-Newton committed Oct 18, 2023
1 parent c3405f5 commit cad62df
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 26 deletions.
25 changes: 12 additions & 13 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult& re

class ObjectInputFile final : public io::RandomAccessFile {
public:
ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient>& blob_client,
ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient> blob_client,
const io::IOContext& io_context, const AzurePath& path,
int64_t size = kNoSize)
: blob_client_(std::move(blob_client)),
Expand Down Expand Up @@ -191,14 +191,15 @@ class ObjectInputFile final : public io::RandomAccessFile {
}
}

Status CheckClosed() const {
Status CheckClosed(const char* action) const {
if (closed_) {
return Status::Invalid("Operation on closed stream");
return Status::Invalid("Cannot ", action, " on closed file.");
}
return Status::OK();
}

Status CheckPosition(int64_t position, const char* action) const {
DCHECK_GE(content_length_, 0);
if (position < 0) {
return Status::Invalid("Cannot ", action, " from negative position");
}
Expand Down Expand Up @@ -228,25 +229,25 @@ class ObjectInputFile final : public io::RandomAccessFile {
bool closed() const override { return closed_; }

Result<int64_t> Tell() const override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckClosed("tell"));
return pos_;
}

Result<int64_t> GetSize() override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckClosed("size"));
return content_length_;
}

Status Seek(int64_t position) override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckClosed("seek"));
RETURN_NOT_OK(CheckPosition(position, "seek"));

pos_ = position;
return Status::OK();
}

Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckClosed("read"));
RETURN_NOT_OK(CheckPosition(position, "read"));

nbytes = std::min(nbytes, content_length_ - position);
Expand All @@ -258,11 +259,9 @@ class ObjectInputFile final : public io::RandomAccessFile {
Azure::Core::Http::HttpRange range{.Offset = position, .Length = nbytes};
Azure::Storage::Blobs::DownloadBlobToOptions download_options{.Range = range};
try {
auto result =
blob_client_
->DownloadTo(reinterpret_cast<uint8_t*>(out), nbytes, download_options)
.Value;
return result.ContentRange.Length.Value();
return blob_client_
->DownloadTo(reinterpret_cast<uint8_t*>(out), nbytes, download_options)
.Value.ContentRange.Length.Value();
} catch (const Azure::Storage::StorageException& exception) {
return ErrorToStatus("When reading from '" + blob_client_->GetUrl() +
"' at position " + std::to_string(position) + " for " +
Expand All @@ -272,7 +271,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
}

Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckClosed("read"));
RETURN_NOT_OK(CheckPosition(position, "read"));

// No need to allocate more than the remaining number of bytes
Expand Down
24 changes: 11 additions & 13 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,29 @@ class TestAzureFileSystem : public ::testing::Test {
public:
std::shared_ptr<FileSystem> fs_;
std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
AzureOptions options_;
std::mt19937_64 generator_;
std::string container_name_;

void MakeFileSystem() {
TestAzureFileSystem() : generator_(std::random_device()()) {}

AzureOptions MakeOptions() {
const std::string& account_name = GetAzuriteEnv()->account_name();
const std::string& account_key = GetAzuriteEnv()->account_key();
options_.backend = AzureBackend::Azurite;
ASSERT_OK(options_.ConfigureAccountKeyCredentials(account_name, account_key));
AzureOptions options;
options.backend = AzureBackend::Azurite;
ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(account_name, account_key));
return options;
}

void SetUp() override {
ASSERT_THAT(GetAzuriteEnv(), NotNull());
ASSERT_OK(GetAzuriteEnv()->status());

MakeFileSystem();
generator_ = std::mt19937_64(std::random_device()());
container_name_ = RandomChars(32);
auto options = MakeOptions();
service_client_ = std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
options_.account_blob_url, options_.storage_credentials_provider);
ASSERT_OK_AND_ASSIGN(fs_, AzureFileSystem::Make(options_));
options.account_blob_url, options.storage_credentials_provider);
ASSERT_OK_AND_ASSIGN(fs_, AzureFileSystem::Make(options));
auto container_client = service_client_->GetBlobContainerClient(container_name_);
container_client.CreateIfNotExists();

Expand Down Expand Up @@ -199,10 +201,6 @@ class TestAzureFileSystem : public ::testing::Test {
return line;
}

uint8_t RandomInteger() {
return std::uniform_int_distribution<std::uint8_t>()(generator_);
}

std::size_t RandomIndex(std::size_t end) {
return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_);
}
Expand All @@ -215,7 +213,7 @@ class TestAzureFileSystem : public ::testing::Test {
return s;
}

void UploadLines(std::vector<std::string> lines, const char* path_to_file,
void UploadLines(const std::vector<std::string>& lines, const char* path_to_file,
int total_size) {
// TODO: Switch to using Azure filesystem to write once its implemented.
auto blob_client = service_client_->GetBlobContainerClient(PreexistingContainerName())
Expand Down

0 comments on commit cad62df

Please sign in to comment.