Skip to content

Commit

Permalink
fix(iceberg): Date partition value parse issue
Browse files Browse the repository at this point in the history
  • Loading branch information
nmahadevuni committed Feb 13, 2025
1 parent d8cac2f commit 457e82a
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 36 deletions.
15 changes: 11 additions & 4 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -634,12 +634,18 @@ namespace {
bool applyPartitionFilter(
const TypePtr& type,
const std::string& partitionValue,
bool isPartitionDateDaysSinceEpoch,
common::Filter* filter) {
if (type->isDate()) {
const auto result = util::fromDateString(
StringView(partitionValue), util::ParseMode::kPrestoCast);
VELOX_CHECK(!result.hasError());
return applyFilter(*filter, result.value());
int32_t result = 0;
// days_since_epoch partition values are integers in string format. Eg.
// Iceberg partition values.
if (isPartitionDateDaysSinceEpoch) {
result = folly::to<int32_t>(partitionValue);
} else {
result = DATE()->toDays(static_cast<folly::StringPiece>(partitionValue));
}
return applyFilter(*filter, result);
}

switch (type->kind()) {
Expand Down Expand Up @@ -701,6 +707,7 @@ bool testFilters(
return applyPartitionFilter(
handlesIter->second->dataType(),
iter->second.value(),
handlesIter->second->isPartitionDateValueDaysSinceEpoch(),
child->filter());
}
// Column is missing, most likely due to schema evolution. Or it's a
Expand Down
15 changes: 12 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,22 @@ VectorPtr newConstantFromString(
vector_size_t size,
velox::memory::MemoryPool* pool,
const std::string& sessionTimezone,
bool asLocalTime) {
bool asLocalTime,
bool isPartitionDateDaysSinceEpoch = false) {
using T = typename TypeTraits<kind>::NativeType;
if (!value.has_value()) {
return std::make_shared<ConstantVector<T>>(pool, size, true, type, T());
}

if (type->isDate()) {
auto days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
int32_t days = 0;
// For Iceberg, the date partition values are already in daysSinceEpoch
// form.
if (isPartitionDateDaysSinceEpoch) {
days = folly::to<int32_t>(value.value());
} else {
days = DATE()->toDays(static_cast<folly::StringPiece>(value.value()));
}
return std::make_shared<ConstantVector<int32_t>>(
pool, size, false, type, std::move(days));
}
Expand Down Expand Up @@ -402,7 +410,8 @@ void SplitReader::setPartitionValue(
connectorQueryCtx_->memoryPool(),
connectorQueryCtx_->sessionTimezone(),
hiveConfig_->readTimestampPartitionValueAsLocalTime(
connectorQueryCtx_->sessionProperties()));
connectorQueryCtx_->sessionProperties()),
it->second->isPartitionDateValueDaysSinceEpoch());
spec->setConstantValue(constant);
}

Expand Down
19 changes: 17 additions & 2 deletions velox/connectors/hive/TableHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ class HiveColumnHandle : public ColumnHandle {
kRowId,
};

struct ColumnParseParameters {
enum PartitionDateValueFormat {
kISO8601,
kDaysSinceEpoch,
} partitionDateValueFormat;
};

/// NOTE: 'dataType' is the column type in target write table. 'hiveType' is
/// converted type of the corresponding column in source table which might not
/// be the same type, and the table scan needs to do data coercion if needs.
Expand All @@ -45,12 +52,14 @@ class HiveColumnHandle : public ColumnHandle {
ColumnType columnType,
TypePtr dataType,
TypePtr hiveType,
std::vector<common::Subfield> requiredSubfields = {})
std::vector<common::Subfield> requiredSubfields = {},
ColumnParseParameters columnParseParameters = {})
: name_(name),
columnType_(columnType),
dataType_(std::move(dataType)),
hiveType_(std::move(hiveType)),
requiredSubfields_(std::move(requiredSubfields)) {
requiredSubfields_(std::move(requiredSubfields)),
columnParseParameters_(columnParseParameters) {
VELOX_USER_CHECK(
dataType_->equivalent(*hiveType_),
"data type {} and hive type {} do not match",
Expand Down Expand Up @@ -96,6 +105,11 @@ class HiveColumnHandle : public ColumnHandle {
return columnType_ == ColumnType::kPartitionKey;
}

bool isPartitionDateValueDaysSinceEpoch() const {
return columnParseParameters_.partitionDateValueFormat ==
ColumnParseParameters::kDaysSinceEpoch;
}

std::string toString() const;

folly::dynamic serialize() const override;
Expand All @@ -115,6 +129,7 @@ class HiveColumnHandle : public ColumnHandle {
const TypePtr dataType_;
const TypePtr hiveType_;
const std::vector<common::Subfield> requiredSubfields_;
const ColumnParseParameters columnParseParameters_;
};

class HiveTableHandle : public ConnectorTableHandle {
Expand Down
145 changes: 118 additions & 27 deletions velox/connectors/hive/iceberg/tests/IcebergReadTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/connectors/hive/HiveConnectorSplit.h"
#include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
Expand Down Expand Up @@ -227,6 +228,36 @@ class HiveIcebergTest : public HiveConnectorTestBase {

const static int rowCount = 20000;

protected:
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {},
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys = {}) {
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

auto file = filesystems::getFileSystem(dataFilePath, nullptr)
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
/*cacheable=*/true,
deleteFiles);
}

private:
std::map<std::string, std::shared_ptr<TempFilePath>> writeDataFiles(
std::map<std::string, std::vector<int64_t>> rowGroupSizesForFiles) {
Expand Down Expand Up @@ -335,31 +366,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
return vectors;
}

std::shared_ptr<ConnectorSplit> makeIcebergSplit(
const std::string& dataFilePath,
const std::vector<IcebergDeleteFile>& deleteFiles = {}) {
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
std::unordered_map<std::string, std::string> customSplitInfo;
customSplitInfo["table_format"] = "hive-iceberg";

auto file = filesystems::getFileSystem(dataFilePath, nullptr)
->openFileForRead(dataFilePath);
const int64_t fileSize = file->size();

return std::make_shared<HiveIcebergSplit>(
kHiveConnectorId,
dataFilePath,
fileFomat_,
0,
fileSize,
partitionKeys,
std::nullopt,
customSplitInfo,
nullptr,
/*cacheable=*/true,
deleteFiles);
}

std::string getDuckDBQuery(
const std::map<std::string, std::vector<int64_t>>& rowGroupSizesForFiles,
const std::unordered_map<
Expand Down Expand Up @@ -478,8 +484,6 @@ class HiveIcebergTest : public HiveConnectorTestBase {
}

dwio::common::FileFormat fileFomat_{dwio::common::FileFormat::DWRF};
std::shared_ptr<dwrf::Config> config_;
std::function<std::unique_ptr<dwrf::DWRFFlushPolicy>()> flushPolicyFactory_;

RowTypePtr rowType_{ROW({"c0"}, {BIGINT()})};
std::shared_ptr<IcebergMetadataColumn> pathColumn_ =
Expand Down Expand Up @@ -660,4 +664,91 @@ TEST_F(HiveIcebergTest, positionalDeletesMultipleSplits) {
assertMultipleSplits({}, 10, 3);
}

TEST_F(HiveIcebergTest, testPartitionedRead) {
RowTypePtr rowType{ROW({"c0", "ds"}, {BIGINT(), DateType::get()})};
std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
// Iceberg API sets partition values for dates to daysSinceEpoch, so
// in velox, we do not need to convert it to days.
// Test query on two partitions ds=17627(2018-04-06), ds=17628(2018-04-07)
std::vector<std::shared_ptr<ConnectorSplit>> splits;
std::vector<std::shared_ptr<TempFilePath>> dataFilePaths;
for (int i = 0; i <= 1; ++i) {
std::vector<RowVectorPtr> dataVectors;
int32_t daysSinceEpoch = 17627 + i;
VectorPtr c0 = makeFlatVector<int64_t>((std::vector<int64_t>){i});
VectorPtr ds =
makeFlatVector<int32_t>((std::vector<int32_t>){daysSinceEpoch});
dataVectors.push_back(makeRowVector({"c0", "ds"}, {c0, ds}));

auto dataFilePath = TempFilePath::create();
dataFilePaths.push_back(dataFilePath);
writeToFile(
dataFilePath->getPath(), dataVectors, config_, flushPolicyFactory_);
partitionKeys["ds"] = std::to_string(daysSinceEpoch);
splits.emplace_back(
makeIcebergSplit(dataFilePath->getPath(), {}, partitionKeys));
}

std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
assignments;

assignments.insert(
{"c0",
std::make_shared<HiveColumnHandle>(
"c0",
HiveColumnHandle::ColumnType::kRegular,
rowType->childAt(0),
rowType->childAt(0))});

std::vector<common::Subfield> requiredSubFields;
HiveColumnHandle::ColumnParseParameters columnParseParameters;
columnParseParameters.partitionDateValueFormat =
HiveColumnHandle::ColumnParseParameters::kDaysSinceEpoch;
assignments.insert(
{"ds",
std::make_shared<HiveColumnHandle>(
"ds",
HiveColumnHandle::ColumnType::kPartitionKey,
rowType->childAt(1),
rowType->childAt(1),
std::move(requiredSubFields),
columnParseParameters)});

auto planBuilder = new PlanBuilder(pool_.get());
auto plan = planBuilder->startTableScan()
.outputType(rowType)
.assignments(assignments)
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(
plan,
splits,
"SELECT * FROM (VALUES (0, '2018-04-06'), (1, '2018-04-07'))",
0);

// Test filter on non-partitioned non-date column
std::vector<std::string> nonPartitionFilters = {"c0 = 1"};
plan = planBuilder->startTableScan()
.outputType(rowType)
.assignments(assignments)
.subfieldFilters(nonPartitionFilters)
.endTableScan()
.planNode();
HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 1, '2018-04-07'");

// Test filter on non-partitioned date column
std::vector<std::string> filters = {"ds = date'2018-04-06'"};
plan = planBuilder->startTableScan()
.outputType(rowType)
.subfieldFilters(filters)
.endTableScan()
.planNode();
splits.clear();
for (auto dataFilePath : dataFilePaths) {
splits.emplace_back(makeIcebergSplit(dataFilePath->getPath(), {}, {}));
}

HiveConnectorTestBase::assertQuery(plan, splits, "SELECT 0, '2018-04-06'");
}

} // namespace facebook::velox::connector::hive::iceberg

0 comments on commit 457e82a

Please sign in to comment.