Skip to content

Commit

Permalink
auto-generated keys for cases where there is no explicit PK
Browse files Browse the repository at this point in the history
  • Loading branch information
zinal committed Aug 30, 2024
1 parent 61a1ef3 commit 1de1a63
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 29 deletions.
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ The connector has passed the basic tests with Apache Spark 3.3, 3.4 and 3.5, wor
1. The connector does not currently support consistent reading and transactional writes. Both features are planned to be implemented using the YDB's snapshots and its `CopyTables` and `RenameTables` APIs.
1. The connector may require significant memory on the Spark executor side to read large tables with the default settings. 4 GB or more memory per core is highly recommended.
1. Access to YDB columnar tables is not supported yet.
1. Reading YDB system tables (those under the `.sys` directory) is currently not supported.
1. Predicate pushdown is limited to primary key, secondary index key (when accessing indexes), or their prefixes. It is specially important to better support pushdowns with YDB columnar tables.
1. Reading and writing YDB tables containing columns of PostgreSQL-compatible types is not supported yet.
1. Handling of YDB's UInt64 data type is inefficient (conversion from and to the corresponding Spark type is performed through text representation).
1. Joining with large YDB tables may be inefficient, because key lookups are currently not supported
1. Reading YDB system tables (those under the `.sys` directory) is currently not supported.
1. Joining with large YDB tables may be inefficient, because key lookups are currently not supported.
1. When writing to YDB tables, there is no way to specify the primary key when explicit table creation is performed in the "error" and "overwrite" save modes. Random unique key is generated in that case and stored in the `_spark_key` column. As a workaround, explicit table creation should be used instead, plus the `truncate=true` option where needed.

Those limitations are to be addressed in the future releases of the connector.

Expand Down Expand Up @@ -119,7 +120,8 @@ The following Spark configuration properties are supported by the YDB connector
* `bulk` - use the BulkUpsert YDB API for data ingestion.

* `batchsize` - max batch rows to be ingested in a single portion, default 500. Use with care, typically should not exceed 1000;
* `primary_key` - list of comma-separated column names to define the YDB table's primary key (only supported for `CREATE TABLE` operations).
* `primary_key` - list of comma-separated column names to define the YDB table's primary key (only supported for `CREATE TABLE` operations);
* `truncate` - a boolean value (`true` or `false`) specifying whether the connector should truncate the existing table before writing to it.

## Using the SQL statements with YDB catalog defined

Expand Down Expand Up @@ -234,9 +236,12 @@ val someDF = spark.createDataFrame(
spark.sparkContext.parallelize(someData),
StructType(someSchema)
)
// writing data to the existing table available via "ydb1" catalog
// append data to the existing table available via "ydb1" catalog
someDF.write.mode("append").saveAsTable("ydb1.table3")
// replace data in the existing table available via "ydb1" catalog
someDF.write.option("truncate", "true").mode("append").saveAsTable("ydb1.table3")
// create the new table in the "ydb1" catalog
someDF.write.saveAsTable("ydb1.table4")
```

## Accessing YDB with Python/Spark
Expand Down
21 changes: 5 additions & 16 deletions src/main/java/tech/ydb/spark/connector/YdbCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,13 @@ public static <T> T checkStatus(Result<T> res, String[] namespace)
public static <T> T checkStatus(Result<T> res, Identifier id)
throws NoSuchTableException {
if (!res.isSuccess()) {
checkStatus(res.getStatus(), id);
}
return res.getValue();
}

public static void checkStatus(Status status, Identifier id)
throws NoSuchTableException {
if (status.isSuccess()) {
return;
}
if (StatusCode.SCHEME_ERROR.equals(status.getCode())) {
for (Issue i : status.getIssues()) {
if (i != null && i.getMessage().endsWith("Path not found")) {
throw new NoSuchTableException(id);
}
Status status = res.getStatus();
if (StatusCode.SCHEME_ERROR.equals(status.getCode())) {
throw new NoSuchTableException(id);
}
status.expectSuccess("ydb metadata query failed on " + id);
}
status.expectSuccess("ydb metadata query failed on " + id);
return res.getValue();
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/tech/ydb/spark/connector/YdbOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public abstract class YdbOptions {
*/
public static final String PRIMARY_KEY = "primary_key";

/**
* YDB table's automatic primary key column name to be filled by the YDB Spark Connector.
*/
public static final String AUTO_PK = "_spark_key";

/**
* YDB table's truncate option when writing to the existing table.
*/
Expand Down
32 changes: 32 additions & 0 deletions src/main/java/tech/ydb/spark/connector/YdbWriteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class YdbWriteOptions extends YdbTableOperationOptions implements Seriali
private final StructType tableType;
private final StructType inputType;
private final boolean mapByNames;
private final String generatedPk;
private final String queryId;
private final YdbIngestMethod ingestMethod;
private final HashMap<String, String> options;
Expand All @@ -32,6 +33,7 @@ public YdbWriteOptions(YdbTable table, boolean mapByNames, StructType inputType,
this.tableType = table.schema();
this.inputType = inputType;
this.mapByNames = mapByNames;
this.generatedPk = detectGeneratedPk(this.tableType, this.inputType);
this.queryId = queryId;
this.options = new HashMap<>();
if (options != null) {
Expand Down Expand Up @@ -62,6 +64,7 @@ public YdbWriteOptions(YdbWriteOptions src, boolean truncate) {
this.tableType = src.tableType;
this.inputType = src.inputType;
this.mapByNames = src.mapByNames;
this.generatedPk = src.generatedPk;
this.queryId = src.queryId;
this.ingestMethod = src.ingestMethod;
this.options = new HashMap<>(src.options);
Expand Down Expand Up @@ -93,6 +96,10 @@ public boolean isMapByNames() {
return mapByNames;
}

public String getGeneratedPk() {
return generatedPk;
}

public int getMaxBulkRows() {
return maxBulkRows;
}
Expand All @@ -101,4 +108,29 @@ public boolean isTruncate() {
return truncate;
}

/**
* PK generation detection.
* @param tableType Target table structure
* @param inputType Input dataset structure
* @return generated PK column name, if target table has automatic PK column,
* and it is not included in the input dataset
*/
private static String detectGeneratedPk(StructType tableType, StructType inputType) {
String pkName = null;
for (String name : tableType.fieldNames()) {
if (YdbOptions.AUTO_PK.equalsIgnoreCase(name)) {
pkName = name;
}
}
if (pkName == null) {
return null;
}
for (String name : inputType.fieldNames()) {
if (pkName.equals(name)) {
return null;
}
}
return pkName;
}

}
15 changes: 13 additions & 2 deletions src/main/java/tech/ydb/spark/connector/impl/YdbCreateTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,23 @@ public static List<YdbFieldInfo> convert(YdbTypes types, StructType st) {
return fields;
}

public static List<String> makePrimaryKey(List<YdbFieldInfo> fields, Map<String, String> properties) {
private static List<String> makePrimaryKey(List<YdbFieldInfo> fields, Map<String, String> properties) {
String value = properties.get(YdbOptions.PRIMARY_KEY);
if (value == null) {
throw new IllegalArgumentException("Cannot create table without primary_key option");
String autoPk = grabAutoPk(fields);
return Arrays.asList(new String[]{autoPk});
}
return Arrays.asList(value.split("[,]"));
}

private static String grabAutoPk(List<YdbFieldInfo> fields) {
for (YdbFieldInfo yfi : fields) {
if (YdbOptions.AUTO_PK.equalsIgnoreCase(yfi.getName())) {
return yfi.getName();
}
}
fields.add(new YdbFieldInfo(YdbOptions.AUTO_PK, YdbFieldType.Text, false));
return YdbOptions.AUTO_PK;
}

}
38 changes: 31 additions & 7 deletions src/main/java/tech/ydb/spark/connector/impl/YdbWriterImpl.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package tech.ydb.spark.connector.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.apache.spark.sql.catalyst.InternalRow;
Expand Down Expand Up @@ -97,18 +100,35 @@ private Map<String, Value<?>> convertRow(InternalRow record) throws IOException
for (int i = 0; i < numFields; ++i) {
YdbFieldInfo yfi = statementFields.get(i);
final Object value = record.get(i, inputFields.get(i).dataType());
Value<?> conv = types.convertToYdb(value, yfi.getType());
if (yfi.isNullable()) {
if (conv.getType().getKind() != Type.Kind.OPTIONAL) {
conv = conv.makeOptional();
}
}
currentRow.put(yfi.getName(), conv);
currentRow.put(yfi.getName(), convertValue(value, yfi));
}
if (statementFields.size() > numFields) {
// The last field should be the auto-generated PK.
YdbFieldInfo yfi = statementFields.get(numFields);
currentRow.put(yfi.getName(), convertValue(randomPk(), yfi));
}
// LOG.debug("Converted input row: {}", currentRow);
return currentRow;
}

private Value<?> convertValue(Object value, YdbFieldInfo yfi) {
Value<?> conv = types.convertToYdb(value, yfi.getType());
if (yfi.isNullable()) {
if (conv.getType().getKind() != Type.Kind.OPTIONAL) {
conv = conv.makeOptional();
}
}
return conv;
}

private String randomPk() {
UUID uuid = UUID.randomUUID();
ByteBuffer bb = ByteBuffer.allocate(16);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return Base64.getUrlEncoder().withoutPadding().encodeToString(bb.array());
}

private void startNewStatement(List<Value<?>> input) {
LOG.debug("[{}, {}] Sending a batch of {} rows into table {}",
partitionId, taskId, input.size(), tablePath);
Expand Down Expand Up @@ -198,6 +218,10 @@ private static List<YdbFieldInfo> makeStatementFields(YdbWriteOptions options,
out.add(yfi);
}
}
if (options.getGeneratedPk() != null) {
// Generated PK is the last column, if one is presented at all.
out.add(new YdbFieldInfo(options.getGeneratedPk(), YdbFieldType.Text, false));
}
return out;
}

Expand Down

0 comments on commit 1de1a63

Please sign in to comment.