Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport to branch(3.14) : Add export options validator #2454

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,34 @@ public enum CoreError implements ScalarDbError {
""),
DATA_LOADER_ERROR_METHOD_NULL_ARGUMENT(
Category.USER_ERROR, "0151", "Method null argument not allowed", "", ""),
ABAC_NOT_ENABLED(
Category.USER_ERROR,
"0152",
"The attribute-based access control feature is not enabled. To use this feature, you must enable it. Note that this feature is supported only in the ScalarDB Enterprise edition",
"",
""),
DATA_LOADER_CLUSTERING_KEY_NOT_FOUND(
Category.USER_ERROR, "0153", "The provided clustering key %s was not found", "", ""),
DATA_LOADER_INVALID_PROJECTION(
Category.USER_ERROR, "0154", "The column '%s' was not found", "", ""),
DATA_LOADER_INCOMPLETE_PARTITION_KEY(
Category.USER_ERROR,
"0155",
"The provided partition key is incomplete. Required key: %s",
"",
""),
DATA_LOADER_CLUSTERING_KEY_ORDER_MISMATCH(
Category.USER_ERROR,
"0156",
"The provided clustering key order does not match the table schema. Required order: %s",
"",
""),
DATA_LOADER_PARTITION_KEY_ORDER_MISMATCH(
Category.USER_ERROR,
"0157",
"The provided partition key order does not match the table schema. Required order: %s",
"",
""),

//
// Errors for the concurrency error category
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.scalar.db.dataloader.core.dataexport.validation;

/** A custom exception for export options validation errors */
public class ExportOptionsValidationException extends Exception {

/**
* Class constructor
*
* @param message error message
*/
public ExportOptionsValidationException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package com.scalar.db.dataloader.core.dataexport.validation;

import com.scalar.db.api.Scan;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.dataloader.core.dataexport.ExportOptions;
import com.scalar.db.io.Column;
import com.scalar.db.io.Key;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* A validator for ensuring that export options are consistent with the ScalarDB table metadata and
* follow the defined constraints.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ExportOptionsValidator {

/**
* Validates the export request.
*
* @param exportOptions The export options provided by the user.
* @param tableMetadata The metadata of the ScalarDB table to validate against.
* @throws ExportOptionsValidationException If the export options are invalid.
*/
public static void validate(ExportOptions exportOptions, TableMetadata tableMetadata)
throws ExportOptionsValidationException {
LinkedHashSet<String> partitionKeyNames = tableMetadata.getPartitionKeyNames();
LinkedHashSet<String> clusteringKeyNames = tableMetadata.getClusteringKeyNames();
ScanRange scanRange = exportOptions.getScanRange();

validatePartitionKey(partitionKeyNames, exportOptions.getScanPartitionKey());
validateProjectionColumns(tableMetadata.getColumnNames(), exportOptions.getProjectionColumns());
validateSortOrders(clusteringKeyNames, exportOptions.getSortOrders());

if (scanRange.getScanStartKey() != null) {
validateClusteringKey(clusteringKeyNames, scanRange.getScanStartKey());
}
if (scanRange.getScanEndKey() != null) {
validateClusteringKey(clusteringKeyNames, scanRange.getScanEndKey());
}
}

/*
* Check if the provided partition key is available in the ScalarDB table
* @param partitionKeyNames List of partition key names available in a
* @param key To be validated ScalarDB key
* @throws ExportOptionsValidationException if the key could not be found or is not a partition
*/
private static void validatePartitionKey(LinkedHashSet<String> partitionKeyNames, Key key)
throws ExportOptionsValidationException {
if (partitionKeyNames == null || key == null) {
return;
}

// Make sure that all partition key columns are provided
if (partitionKeyNames.size() != key.getColumns().size()) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_INCOMPLETE_PARTITION_KEY.buildMessage(partitionKeyNames));
}

// Check if the order of columns in key.getColumns() matches the order in partitionKeyNames
Iterator<String> partitionKeyIterator = partitionKeyNames.iterator();
for (Column<?> column : key.getColumns()) {
// Check if the column names match in order
if (!partitionKeyIterator.hasNext()
|| !partitionKeyIterator.next().equals(column.getName())) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_PARTITION_KEY_ORDER_MISMATCH.buildMessage(partitionKeyNames));
}
}
}

private static void validateSortOrders(
LinkedHashSet<String> clusteringKeyNames, List<Scan.Ordering> sortOrders)
throws ExportOptionsValidationException {
if (sortOrders == null || sortOrders.isEmpty()) {
return;
}

for (Scan.Ordering sortOrder : sortOrders) {
checkIfColumnExistsAsClusteringKey(clusteringKeyNames, sortOrder.getColumnName());
}
}

/**
* Validates that the clustering key columns in the given Key object match the expected order
* defined in the clusteringKeyNames. The Key can be a prefix of the clusteringKeyNames, but the
* order must remain consistent.
*
* @param clusteringKeyNames the expected ordered set of clustering key names
* @param key the Key object containing the actual clustering key columns
* @throws ExportOptionsValidationException if the order or names of clustering keys do not match
*/
private static void validateClusteringKey(LinkedHashSet<String> clusteringKeyNames, Key key)
throws ExportOptionsValidationException {
// If either clusteringKeyNames or key is null, no validation is needed
if (clusteringKeyNames == null || key == null) {
return;
}

// Create an iterator to traverse the clusteringKeyNames in order
Iterator<String> clusteringKeyIterator = clusteringKeyNames.iterator();

// Iterate through the columns in the given Key
for (Column<?> column : key.getColumns()) {
// If clusteringKeyNames have been exhausted but columns still exist in the Key,
// it indicates a mismatch
if (!clusteringKeyIterator.hasNext()) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_CLUSTERING_KEY_ORDER_MISMATCH.buildMessage(clusteringKeyNames));
}

// Get the next expected clustering key name
String expectedKey = clusteringKeyIterator.next();

// Check if the current column name matches the expected clustering key name
if (!column.getName().equals(expectedKey)) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_CLUSTERING_KEY_ORDER_MISMATCH.buildMessage(clusteringKeyNames));
}
}
}

private static void checkIfColumnExistsAsClusteringKey(
LinkedHashSet<String> clusteringKeyNames, String columnName)
throws ExportOptionsValidationException {
if (clusteringKeyNames == null || columnName == null) {
return;
}

if (!clusteringKeyNames.contains(columnName)) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_CLUSTERING_KEY_NOT_FOUND.buildMessage(columnName));
}
}

private static void validateProjectionColumns(
LinkedHashSet<String> columnNames, List<String> columns)
throws ExportOptionsValidationException {
if (columns == null || columns.isEmpty()) {
return;
}

for (String column : columns) {
if (!columnNames.contains(column)) {
throw new ExportOptionsValidationException(
CoreError.DATA_LOADER_INVALID_PROJECTION.buildMessage(column));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package com.scalar.db.dataloader.core.dataexport.validation;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.ScanRange;
import com.scalar.db.dataloader.core.dataexport.ExportOptions;
import com.scalar.db.io.DataType;
import com.scalar.db.io.IntColumn;
import com.scalar.db.io.Key;
import com.scalar.db.io.TextColumn;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ExportOptionsValidatorTest {

private TableMetadata singlePkCkMetadata;
private TableMetadata multiplePkCkMetadata;
private List<String> projectedColumns;

@BeforeEach
void setup() {
singlePkCkMetadata = createMockMetadata(1, 1);
multiplePkCkMetadata = createMockMetadata(2, 2);
projectedColumns = createProjectedColumns();
}

private TableMetadata createMockMetadata(int pkCount, int ckCount) {
TableMetadata.Builder builder = TableMetadata.newBuilder();

// Add partition keys
for (int i = 1; i <= pkCount; i++) {
builder.addColumn("pk" + i, DataType.INT);
builder.addPartitionKey("pk" + i);
}

// Add clustering keys
for (int i = 1; i <= ckCount; i++) {
builder.addColumn("ck" + i, DataType.TEXT);
builder.addClusteringKey("ck" + i);
}

return builder.build();
}

private List<String> createProjectedColumns() {
List<String> columns = new ArrayList<>();
columns.add("pk1");
columns.add("ck1");
return columns;
}

@Test
void validate_withValidExportOptionsForSinglePkCk_ShouldNotThrowException()
throws ExportOptionsValidationException {

Key partitionKey = Key.newBuilder().add(IntColumn.of("pk1", 1)).build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", partitionKey, FileFormat.JSON)
.projectionColumns(projectedColumns)
.scanRange(new ScanRange(null, null, false, false))
.build();

ExportOptionsValidator.validate(exportOptions, singlePkCkMetadata);
}

@Test
void validate_withValidExportOptionsForMultiplePkCk_ShouldNotThrowException()
throws ExportOptionsValidationException {

Key partitionKey =
Key.newBuilder().add(IntColumn.of("pk1", 1)).add(IntColumn.of("pk2", 2)).build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", partitionKey, FileFormat.JSON)
.projectionColumns(projectedColumns)
.scanRange(new ScanRange(null, null, false, false))
.build();

ExportOptionsValidator.validate(exportOptions, multiplePkCkMetadata);
}

@Test
void validate_withIncompletePartitionKeyForSinglePk_ShouldThrowException() {
Key incompletePartitionKey = Key.newBuilder().build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", incompletePartitionKey, FileFormat.JSON).build();

assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, singlePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(
CoreError.DATA_LOADER_INCOMPLETE_PARTITION_KEY.buildMessage(
singlePkCkMetadata.getPartitionKeyNames()));
}

@Test
void validate_withIncompletePartitionKeyForMultiplePks_ShouldThrowException() {
Key incompletePartitionKey = Key.newBuilder().add(IntColumn.of("pk1", 1)).build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", incompletePartitionKey, FileFormat.JSON).build();

assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, multiplePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(
CoreError.DATA_LOADER_INCOMPLETE_PARTITION_KEY.buildMessage(
multiplePkCkMetadata.getPartitionKeyNames()));
}

@Test
void validate_withInvalidProjectionColumn_ShouldThrowException() {
ExportOptions exportOptions =
ExportOptions.builder(
"test",
"sample",
Key.newBuilder().add(IntColumn.of("pk1", 1)).build(),
FileFormat.JSON)
.projectionColumns(Collections.singletonList("invalid_column"))
.build();

assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, singlePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(CoreError.DATA_LOADER_INVALID_PROJECTION.buildMessage("invalid_column"));
}

@Test
void validate_withInvalidClusteringKeyInScanRange_ShouldThrowException() {
ScanRange scanRange =
new ScanRange(
Key.newBuilder().add(TextColumn.of("invalid_ck", "value")).build(),
Key.newBuilder().add(TextColumn.of("ck1", "value")).build(),
false,
false);

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", createValidPartitionKey(), FileFormat.JSON)
.scanRange(scanRange)
.build();

assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, singlePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(CoreError.DATA_LOADER_CLUSTERING_KEY_ORDER_MISMATCH.buildMessage("[ck1]"));
}

@Test
void validate_withInvalidPartitionKeyOrder_ShouldThrowException() {
// Partition key names are expected to be "pk1", "pk2"
LinkedHashSet<String> partitionKeyNames = new LinkedHashSet<>();
partitionKeyNames.add("pk1");
partitionKeyNames.add("pk2");

// Create a partition key with reversed order, expecting an error
Key invalidPartitionKey =
Key.newBuilder()
.add(IntColumn.of("pk2", 2)) // Incorrect order
.add(IntColumn.of("pk1", 1)) // Incorrect order
.build();

ExportOptions exportOptions =
ExportOptions.builder("test", "sample", invalidPartitionKey, FileFormat.JSON)
.projectionColumns(projectedColumns)
.scanRange(new ScanRange(null, null, false, false))
.build();

// Verify that the validator throws the correct exception
assertThatThrownBy(() -> ExportOptionsValidator.validate(exportOptions, multiplePkCkMetadata))
.isInstanceOf(ExportOptionsValidationException.class)
.hasMessage(
CoreError.DATA_LOADER_PARTITION_KEY_ORDER_MISMATCH.buildMessage(partitionKeyNames));
}

private Key createValidPartitionKey() {
return Key.newBuilder().add(IntColumn.of("pk1", 1)).build();
}
}
Loading