Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRUNCATE implementation, as discussed in #3 #4

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>tech.ydb.spark</groupId>
<artifactId>ydb-spark-connector</artifactId>
<version>1.2</version>
<version>1.3-SNAPSHOT</version>
<!-- <version>X.Y[-SNAPSHOT]</version> -->
<packaging>jar</packaging>
<developers>
Expand All @@ -27,7 +27,7 @@
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>2.2.6</version>
<version>2.2.8</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/tech/ydb/spark/connector/YdbCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private void listIndexes(String[] namespace, List<Identifier> retval,
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
public YdbTable loadTable(Identifier ident) throws NoSuchTableException {
if (ident.name().startsWith("ix/")) {
// Special support for index "tables".
return loadIndexTable(ident);
Expand All @@ -185,7 +185,7 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
return new YdbTable(getConnector(), mergeLocal(ident), tablePath, td);
}

private Table loadIndexTable(Identifier ident) throws NoSuchTableException {
private YdbTable loadIndexTable(Identifier ident) throws NoSuchTableException {
String pseudoName = ident.name();
String[] tabParts = pseudoName.split("[/]");
if (tabParts.length != 3) {
Expand Down
34 changes: 19 additions & 15 deletions src/main/java/tech/ydb/spark/connector/YdbTable.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package tech.ydb.spark.connector;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -30,6 +29,7 @@
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

import tech.ydb.spark.connector.impl.YdbConnector;
import tech.ydb.spark.connector.impl.YdbTruncateTable;
import tech.ydb.table.description.KeyRange;
import tech.ydb.table.description.TableColumn;
import tech.ydb.table.description.TableDescription;
Expand All @@ -47,16 +47,6 @@ public class YdbTable implements Table,
private static final org.slf4j.Logger LOG
= org.slf4j.LoggerFactory.getLogger(YdbTable.class);

static final Set<TableCapability> CAPABILITIES;

static {
final Set<TableCapability> c = new HashSet<>();
c.add(TableCapability.BATCH_READ);
c.add(TableCapability.BATCH_WRITE);
c.add(TableCapability.ACCEPT_ANY_SCHEMA); // allow YDB to check the schema
CAPABILITIES = Collections.unmodifiableSet(c);
}

private final YdbConnector connector;
private final YdbTypes types;
private final String logicalName;
Expand All @@ -66,6 +56,7 @@ public class YdbTable implements Table,
private final ArrayList<YdbFieldType> keyTypes;
private final ArrayList<YdbKeyRange> partitions;
private final Map<String, String> properties;
private final boolean indexPseudoTable;
private StructType schema;

/**
Expand All @@ -88,6 +79,7 @@ public class YdbTable implements Table,
this.keyTypes = new ArrayList<>();
this.partitions = new ArrayList<>();
this.properties = new HashMap<>();
this.indexPseudoTable = false;
Map<String, TableColumn> cm = buildColumnsMap(td);
for (String cname : td.getPrimaryKeys()) {
TableColumn tc = cm.get(cname);
Expand Down Expand Up @@ -150,6 +142,7 @@ public class YdbTable implements Table,
this.keyTypes = new ArrayList<>();
this.partitions = new ArrayList<>();
this.properties = new HashMap<>();
this.indexPseudoTable = true;
HashSet<String> known = new HashSet<>();
Map<String, TableColumn> cm = buildColumnsMap(tdMain);
// Add index key columns
Expand Down Expand Up @@ -257,7 +250,15 @@ public Map<String, String> properties() {

@Override
public Set<TableCapability> capabilities() {
return CAPABILITIES;
final Set<TableCapability> c = new HashSet<>();
c.add(TableCapability.BATCH_READ);
c.add(TableCapability.ACCEPT_ANY_SCHEMA); // allow YDB to check the schema
if (!indexPseudoTable) {
// tables support writes, while indexes do not
c.add(TableCapability.BATCH_WRITE);
c.add(TableCapability.TRUNCATE);
}
return c;
}

@Override
Expand All @@ -274,7 +275,7 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {

@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
return new YdbWriteBuilder(this, info);
return new YdbWriteBuilder(this, info, false);
}

@Override
Expand All @@ -291,8 +292,11 @@ public void deleteWhere(Filter[] filters) {

@Override
public boolean truncateTable() {
// TODO: implementation
throw new UnsupportedOperationException("Not supported yet.");
final YdbTruncateTable action = new YdbTruncateTable(tablePath);
connector.getRetryCtx().supplyStatus(session -> action.run(session))
.join()
.expectSuccess();
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import java.util.List;
import java.util.Map;

import tech.ydb.spark.connector.impl.YdbConnector;
import tech.ydb.spark.connector.impl.YdbRegistry;

/**
* Generic YDB table operation settings.
*
Expand Down Expand Up @@ -68,4 +71,8 @@ public Map<String, YdbFieldInfo> getFieldsMap() {
return fieldsMap;
}

public YdbConnector grabConnector() {
return YdbRegistry.getOrCreate(catalogName, connectOptions);
}

}
12 changes: 10 additions & 2 deletions src/main/java/tech/ydb/spark/connector/YdbWrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.connector.write.WriterCommitMessage;

import tech.ydb.spark.connector.impl.YdbTruncateTable;
import tech.ydb.spark.connector.impl.YdbWriterImpl;

/**
Expand All @@ -26,9 +27,9 @@ public class YdbWrite implements Serializable,

private final YdbWriteOptions options;

YdbWrite(YdbTable table, LogicalWriteInfo lwi, boolean mapByNames) {
YdbWrite(YdbTable table, LogicalWriteInfo lwi, boolean mapByNames, boolean truncate) {
this.options = new YdbWriteOptions(table, mapByNames,
lwi.schema(), lwi.queryId(), lwi.options());
lwi.schema(), lwi.queryId(), lwi.options(), truncate);
}

@Override
Expand All @@ -48,6 +49,13 @@ public void onDataWriterCommit(WriterCommitMessage message) {
@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
// TODO: create the COW copy of the destination table
if (options.isTruncate()) {
YdbTruncateTable action = new YdbTruncateTable(options.getTablePath());
options.grabConnector().getRetryCtx()
.supplyStatus(session -> action.run(session))
.join()
.expectSuccess();
}
return this;
}

Expand Down
15 changes: 12 additions & 3 deletions src/main/java/tech/ydb/spark/connector/YdbWriteBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.regex.Pattern;

import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.SupportsTruncate;
import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.DataType;
Expand All @@ -17,19 +18,22 @@
*
* @author zinal
*/
public class YdbWriteBuilder implements WriteBuilder {
public class YdbWriteBuilder implements WriteBuilder, SupportsTruncate {

private final YdbTable table;
private final LogicalWriteInfo info;
private final boolean truncate;

public YdbWriteBuilder(YdbTable table, LogicalWriteInfo info) {
public YdbWriteBuilder(YdbTable table, LogicalWriteInfo info, boolean truncate) {
this.table = table;
this.info = info;
this.truncate = truncate;
}

@Override
public Write build() {
return new YdbWrite(table, info, validateSchemas(table.schema(), info.schema()));
boolean mapByNames = validateSchemas(table.schema(), info.schema());
return new YdbWrite(table, info, mapByNames, truncate);
}

private boolean validateSchemas(StructType actualSchema, StructType inputSchema) {
Expand Down Expand Up @@ -76,4 +80,9 @@ private boolean isAssignableFrom(DataType dst, DataType src) {
return true;
}

@Override
public WriteBuilder truncate() {
return new YdbWriteBuilder(table, info, true);
}

}
8 changes: 7 additions & 1 deletion src/main/java/tech/ydb/spark/connector/YdbWriteOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ public class YdbWriteOptions extends YdbTableOperationOptions implements Seriali
private final YdbIngestMethod ingestMethod;
private final HashMap<String, String> options;
private final int maxBulkRows;
private final boolean truncate;

public YdbWriteOptions(YdbTable table, boolean mapByNames, StructType inputType,
String queryId, Map<String, String> options) {
String queryId, Map<String, String> options, boolean truncate) {
super(table);
this.tableType = table.schema();
this.inputType = inputType;
Expand All @@ -53,6 +54,7 @@ public YdbWriteOptions(YdbTable table, boolean mapByNames, StructType inputType,
n = 500;
}
this.maxBulkRows = n;
this.truncate = truncate;
}

public String getQueryId() {
Expand Down Expand Up @@ -83,4 +85,8 @@ public int getMaxBulkRows() {
return maxBulkRows;
}

public boolean isTruncate() {
return truncate;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void prepare() {
rtsb.withRequestTimeout(Duration.ofHours(8));

// Create or acquire the connector object.
YdbConnector c = YdbRegistry.getOrCreate(options.getCatalogName(), options.getConnectOptions());
YdbConnector c = options.grabConnector();
// Obtain the session (will be a long running one).
session = c.getTableClient().createSession(
Duration.ofSeconds(options.getScanSessionSeconds())).join().getValue();
Expand Down
115 changes: 115 additions & 0 deletions src/main/java/tech/ydb/spark/connector/impl/YdbTruncateTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package tech.ydb.spark.connector.impl;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.table.Session;
import tech.ydb.table.description.TableColumn;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.description.TableIndex;
import tech.ydb.table.settings.DescribeTableSettings;

/**
* Truncate table emulation. Steps: (1) describe table; (2) create new empty table; (3) replace the
* existing table with the empty one.
*
* @author zinal
*/
public class YdbTruncateTable {

private static final org.slf4j.Logger LOG
= org.slf4j.LoggerFactory.getLogger(YdbTruncateTable.class);

private final String tablePath;

public YdbTruncateTable(String tablePath) {
this.tablePath = tablePath;
}

public CompletableFuture<Status> run(Session session) {
LOG.debug("Truncating table {} ...", tablePath);
DescribeTableSettings dts = new DescribeTableSettings();
Result<TableDescription> dtr = session.describeTable(tablePath, dts).join();
if (!dtr.isSuccess()) {
LOG.debug("Cannot describe {} - {}", tablePath, dtr.getStatus());
return CompletableFuture.completedFuture(dtr.getStatus());
}
TableDescription td = undressDescription(dtr.getValue());
String tempPath = tablePath + "_" + randomSuffix();
Status status = session.createTable(tempPath, td).join();
if (!status.isSuccess()) {
LOG.debug("Cannot create temporary table {} - {}", tempPath, status);
return CompletableFuture.completedFuture(status);
}
LOG.debug("Created temporary table {} ...", tempPath);
status = session.renameTable(tempPath, tablePath, true).join();
if (status.isSuccess()) {
LOG.debug("Truncation successful for {}.", tablePath);
} else {
LOG.debug("Failed to replace {} with {} - {}", tablePath, tempPath, status);
Status tempStatus = session.dropTable(tempPath).join();
if (!tempStatus.isSuccess()) {
LOG.warn("Failed to drop temporary table {} - {}", tempPath, tempStatus);
}
}
return CompletableFuture.completedFuture(status);
}

private String randomSuffix() {
UUID uuid = UUID.randomUUID();
ByteBuffer bb = ByteBuffer.allocate(16);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return Base64.getUrlEncoder().withoutPadding().encodeToString(bb.array());
}

/**
* Undress table description, omitting options that prevent table creation.
* @param src Source table description.
* @return Undressed table description.
*/
private TableDescription undressDescription(TableDescription src) {
TableDescription.Builder b = TableDescription.newBuilder();
src.getColumns().forEach(tc -> undressColumn(b, tc));
b.setPrimaryKeys(src.getPrimaryKeys());
src.getIndexes().forEach(ti -> undressIndex(b, ti));
return b.build();
}

private void undressColumn(TableDescription.Builder b, TableColumn tc) {
switch (tc.getType().getKind()) {
case OPTIONAL:
b.addNullableColumn(tc.getName(), tc.getType().unwrapOptional());
break;
default:
b.addNonnullColumn(tc.getName(), tc.getType());
}
}

private void undressIndex(TableDescription.Builder b, TableIndex ti) {
switch (ti.getType()) {
case GLOBAL:
if (ti.getDataColumns() != null) {
b.addGlobalIndex(ti.getName(), ti.getColumns(), ti.getDataColumns());
} else {
b.addGlobalIndex(ti.getName(), ti.getColumns());
}
break;
case GLOBAL_ASYNC:
if (ti.getDataColumns() != null) {
b.addGlobalAsyncIndex(ti.getName(), ti.getColumns(), ti.getDataColumns());
} else {
b.addGlobalAsyncIndex(ti.getName(), ti.getColumns());
}
break;
default:
LOG.warn("Unknown index type: {}, index {}, table {}",
ti.getType(), ti.getName(), tablePath);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public YdbWriterImpl(YdbWriteOptions options) {
this.listType = tech.ydb.table.values.ListType.of(this.inputType);
this.ingestMethod = options.getIngestMethod();
this.maxBulkRows = options.getMaxBulkRows();
this.connector = YdbRegistry.getOrCreate(
options.getCatalogName(), options.getConnectOptions());
this.connector = options.grabConnector();
this.currentInput = new ArrayList<>();
this.currentStatus = null;
if (LOG.isDebugEnabled()) {
Expand Down