Skip to content

Commit

Permalink
bump sdk version, prepare for columnar tables support
Browse files Browse the repository at this point in the history
  • Loading branch information
zinal committed Nov 16, 2024
1 parent b643e28 commit e364627
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 9 deletions.
21 changes: 19 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,24 @@
<version>1.3-SNAPSHOT</version>
<!-- <version>X.Y[-SNAPSHOT]</version> -->
<packaging>jar</packaging>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>https://www.apache.org/licenses/LICENSE-2.0</url>
</license>
</licenses>
<scm>
<url>https://github.com/ydb-platform/ydb-spark-connector</url>
<connection>scm:git:https://github.com/ydb-platform/ydb-spark-connector.git</connection>
<developerConnection>scm:git:https://github.com/ydb-platform/ydb-spark-connector.git</developerConnection>
</scm>
<developers>
<developer>
<name>Aleksandr Gorshenin</name>
<email>[email protected]</email>
<organization>YDB</organization>
<organizationUrl>https://ydb.tech/</organizationUrl>
</developer>
<developer>
<name>Maksim Zinal</name>
<email>[email protected]</email>
Expand All @@ -27,11 +44,11 @@
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>2.2.11-SNAPSHOT</version>
<version>2.3.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<dependency> <!-- for manual dependency injection in shaded mode -->
<groupId>io.grpc</groupId>
<artifactId>grpc-bom</artifactId>
<version>1.59.1</version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;

import tech.ydb.spark.connector.impl.YdbScanImpl;
import tech.ydb.spark.connector.impl.YdbScanReadTable;

/**
* Partition reader factory delivers the scan options to partition reader instances.
Expand Down Expand Up @@ -36,7 +36,7 @@ static class YdbReader implements PartitionReader<InternalRow> {

private final YdbScanOptions options;
private final YdbTablePartition partition;
private YdbScanImpl scan;
private YdbScanReadTable scan;

YdbReader(YdbScanOptions options, YdbTablePartition partition) {
this.options = options;
Expand All @@ -48,7 +48,7 @@ public boolean next() throws IOException {
if (scan == null) {
LOG.debug("Preparing scan for table {} at partition {}",
options.getTablePath(), partition);
scan = new YdbScanImpl(options, partition.getRange());
scan = new YdbScanReadTable(options, partition.getRange());
scan.prepare();
LOG.debug("Scan prepared, ready to fetch...");
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/tech/ydb/spark/connector/YdbTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public static Result<YdbTable> lookup(YdbConnector connector, YdbTypes types,
}

static YdbStoreType convertStoreType(TableDescription td) {
/* TODO: implement store type detection
switch (td.getStoreType()) {
case COLUMN:
return YdbStoreType.COLUMN;
Expand All @@ -215,6 +216,8 @@ static YdbStoreType convertStoreType(TableDescription td) {
default:
return YdbStoreType.UNSPECIFIED;
}
*/
return YdbStoreType.ROW;
}

static void fillProperties(Map<String, String> props, String tablePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ public CompletableFuture<Status> createTable(Session session) {
}
ps.setMinPartitionsCount(minPartitions);
ps.setMaxPartitionsCount(maxPartitions);
long minSizeMb = getLongOption(YdbOptions.AP_PART_SIZE_MB, 2000L);
long minSizeMb = getLongOption(YdbOptions.AP_PART_SIZE_MB, 1000L);
if (minSizeMb < 1L) {
minSizeMb = 10L;
}
ps.setPartitionSize(minSizeMb);
tdb.setPartitioningSettings(ps);

/* TODO: implement store type configuration
switch (storeType) {
case ROW:
tdb.setStoreType(TableDescription.StoreType.ROW);
Expand All @@ -99,6 +100,7 @@ public CompletableFuture<Status> createTable(Session session) {
default:
break;
}
*/

return session.createTable(tablePath, tdb.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
*
* @author zinal
*/
public class YdbScanImpl implements AutoCloseable {
public class YdbScanReadTable implements AutoCloseable {

private static final org.slf4j.Logger LOG
= org.slf4j.LoggerFactory.getLogger(YdbScanImpl.class);
= org.slf4j.LoggerFactory.getLogger(YdbScanReadTable.class);

private static final QueueItem END_OF_SCAN = new QueueItem(null);

Expand All @@ -48,7 +48,7 @@ public class YdbScanImpl implements AutoCloseable {
private volatile GrpcReadStream<ReadTablePart> stream;
private ResultSetReader current;

public YdbScanImpl(YdbScanOptions options, YdbKeyRange keyRange) {
public YdbScanReadTable(YdbScanOptions options, YdbKeyRange keyRange) {
this.options = options;
this.keyRange = keyRange;
this.queue = new ArrayBlockingQueue<>(options.getScanQueueDepth());
Expand Down

0 comments on commit e364627

Please sign in to comment.