Skip to content

Commit

Permalink
Merge pull request #44 from zilliztech/nianliuu
Browse files Browse the repository at this point in the history
nianliuu
  • Loading branch information
nianliuu authored Jan 17, 2025
2 parents a26948d + d145b8a commit 22c1386
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 27 deletions.
2 changes: 1 addition & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -156,5 +156,5 @@ seatunnel.source.Qdrant = connector-qdrant
seatunnel.sink.Qdrant = connector-qdrant
seatunnel.source.TencentVectorDB = connector-tencent-vectordb
setunnel.source.AstraDB = connector-astradb
seatunnel.sink.Shopify = connector-shopify
seatunnel.source.Shopify = connector-shopify

4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
<revision>2.3.8-SNAPSHOT</revision>
<seatunnel.config.shade.version>2.1.1</seatunnel.config.shade.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>17</java.version>
<java.version>11</java.version>
<scala.version>2.12.15</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand Down Expand Up @@ -102,7 +102,7 @@
<elasticsearch6.client.version>6.3.1</elasticsearch6.client.version>
<elasticsearch7.client.version>7.5.1</elasticsearch7.client.version>
<flink-shaded-hadoop-2.version>2.7.5-7.0</flink-shaded-hadoop-2.version>
<commons-lang3.version>3.5</commons-lang3.version>
<commons-lang3.version>3.17.0</commons-lang3.version>
<commons-io.version>2.11.0</commons-io.version>
<commons-collections4.version>4.4</commons-collections4.version>
<commons-csv.version>1.10.0</commons-csv.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private int getBytesForArray(Object v, SeaTunnelDataType<?> dataType) {
case INT:
return getArrayNotNullSize((Integer[]) v) * 4;
case FLOAT:
return getArrayNotNullSize((Float[]) v) * 4;
return getArrayNotNullSize((Object[]) v) * 4;
case BIGINT:
return getArrayNotNullSize((Long[]) v) * 8;
case DOUBLE:
Expand Down
53 changes: 53 additions & 0 deletions seatunnel-connectors-v2/connector-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,54 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
<version>1.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.3.0</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/s3 -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.29.46</version>
</dependency>

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/sts -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>2.29.46</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kms</artifactId>
<version>2.29.46</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>2.29.46</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>
<version>2.29.46</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down Expand Up @@ -244,6 +292,11 @@
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.spark</pattern>
<!--suppress UnresolvedMavenProperty -->
<shadedPattern>iceberg.org.apache.spark</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

package org.apache.seatunnel.connectors.seatunnel.iceberg.catalog;

import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
Expand All @@ -31,9 +36,12 @@
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.VectorType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogLoader;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;

import org.apache.iceberg.PartitionField;
Expand All @@ -51,6 +59,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -230,29 +239,34 @@ public CatalogTable toCatalogTable(Table icebergTable, TablePath tablePath) {
Schema schema = icebergTable.schema();
List<Types.NestedField> columns = schema.columns();
TableSchema.Builder builder = TableSchema.builder();

columns.forEach(
nestedField -> {
String name = nestedField.name();
SeaTunnelDataType<?> seaTunnelType =
SchemaUtils.toSeaTunnelType(name, nestedField.type());
Integer scale = null;
if(seaTunnelType.equals(ArrayType.FLOAT_ARRAY_TYPE)){
seaTunnelType = VectorType.VECTOR_FLOAT_TYPE;
scale = getScale(icebergTable, name);
}
PhysicalColumn physicalColumn =
PhysicalColumn.of(
name,
seaTunnelType,
(Long) null,
scale,
nestedField.isOptional(),
null,
nestedField.doc());
builder.column(physicalColumn);
});
Optional.ofNullable(schema.identifierFieldNames())
.map(
(Function<Set<String>, Object>)
names ->
builder.primaryKey(
PrimaryKey.of(
tablePath.getTableName() + "_pk",
new ArrayList<>(names))));
if(StringUtils.isNotEmpty(readonlyConfig.get(SourceConfig.PRIMARY_KEY))){
String primaryKey = readonlyConfig.get(SourceConfig.PRIMARY_KEY);
builder.primaryKey(
PrimaryKey.of(
tablePath.getTableName() + "_pk", Collections.singletonList(primaryKey)));
}
List<String> partitionKeys =
icebergTable.spec().fields().stream()
.map(PartitionField::name)
Expand All @@ -268,6 +282,17 @@ public CatalogTable toCatalogTable(Table icebergTable, TablePath tablePath) {
catalogName);
}

private Integer getScale(Table icebergTable, String name) {
CloseableIterable<Record> result = IcebergGenerics.read(icebergTable)
.build();
if (result.iterator().hasNext()) {
Record record = result.iterator().next();
List<Object> vector = (List<Object>) record.getField(name);
return vector.size();
}
return null;
}

@Override
public PreviewResult previewAction(
ActionType actionType, TablePath tablePath, Optional<CatalogTable> catalogTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ public class SourceConfig extends CommonConfig {
.enumType(IcebergStreamScanStrategy.class)
.defaultValue(FROM_LATEST_SNAPSHOT)
.withDescription(" the iceberg strategy of stream scanning");
public static final Option<String> PRIMARY_KEY =
Options.key("primary_key")
.stringType()
.noDefaultValue()
.withDescription(" the iceberg primary key");

private Long startSnapshotTimestamp;
private Long startSnapshotId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.VectorType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.BufferUtils;
import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;

import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -119,6 +121,13 @@ private Object convert(
case LIST:
List icebergList = List.class.cast(icebergValue);
Types.ListType icebergListType = (Types.ListType) icebergType;
if(seaTunnelType.equals( VectorType.VECTOR_FLOAT_TYPE)){
Float[] arrays = new Float[icebergList.size()];
for (int i = 0; i < icebergList.size(); i++) {
arrays[i] = Float.parseFloat(icebergList.get(i).toString());
}
return BufferUtils.toByteBuffer(arrays);
}
List seatunnelList = new ArrayList(icebergList.size());
ArrayType seatunnelListType = (ArrayType) seaTunnelType;
for (int i = 0; i < icebergList.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.iceberg.source;

import org.apache.seatunnel.api.table.type.VectorType;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.JobContext;
Expand Down Expand Up @@ -75,7 +76,7 @@ public class IcebergSource
public IcebergSource(ReadonlyConfig config, CatalogTable catalogTable) {
this.sourceConfig = SourceConfig.loadConfig(config);
this.tableSchema = loadIcebergSchema(sourceConfig);
this.seaTunnelRowType = loadSeaTunnelRowType(tableSchema, config.toConfig());
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
this.projectedSchema = tableSchema.select(seaTunnelRowType.getFieldNames());
this.catalogTable = catalogTable;
}
Expand Down Expand Up @@ -113,15 +114,18 @@ private SeaTunnelRowType loadSeaTunnelRowType(Schema tableSchema, Config pluginC

CheckResult checkResult =
CheckConfigUtil.checkAllExists(pluginConfig, TableSchemaOptions.SCHEMA.key());
SeaTunnelRowType projectedRowType =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
if (checkResult.isSuccess()) {
SeaTunnelRowType projectedRowType =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
for (int i = 0; i < projectedRowType.getFieldNames().length; i++) {
String fieldName = projectedRowType.getFieldName(i);
SeaTunnelDataType<?> projectedFieldType = projectedRowType.getFieldType(i);
int originalFieldIndex = originalRowType.indexOf(fieldName);
SeaTunnelDataType<?> originalFieldType =
originalRowType.getFieldType(originalFieldIndex);
if(projectedFieldType.equals(VectorType.VECTOR_FLOAT_TYPE)){
continue;
}
checkArgument(
projectedFieldType.equals(originalFieldType),
String.format(
Expand All @@ -130,7 +134,7 @@ private SeaTunnelRowType loadSeaTunnelRowType(Schema tableSchema, Config pluginC
}
return projectedRowType;
}
return originalRowType;
return projectedRowType;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public class MilvusSinkWriter
private final DescribeCollectionResp describeCollectionResp;
private final Boolean hasPartitionKey;

private final static AtomicLong writeCount = new AtomicLong();
private final static AtomicLong writeCache = new AtomicLong();
private final static AtomicLong writeCount = new AtomicLong(0);
private final static AtomicLong writeCache = new AtomicLong(0);

public MilvusSinkWriter(
Context context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.common.DataType;
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.index.request.CreateIndexReq;
import lombok.extern.slf4j.Slf4j;
Expand Down
6 changes: 6 additions & 0 deletions seatunnel-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-iceberg</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.seatunnel</groupId>-->
<!-- <artifactId>connector-assert</artifactId>-->
Expand Down
6 changes: 6 additions & 0 deletions seatunnel-examples/seatunnel-engine-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-iceberg</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-tencent-vectordb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class SeaTunnelEngineExample {

public static void main(String[] args)
throws FileNotFoundException, URISyntaxException, CommandException {
String configurePath = args.length > 0 ? args[0] : "/examples/milvus.conf";
String configurePath = args.length > 0 ? args[0] : "/examples/iceberg.conf";
String configFile = getTestConfigFile(configurePath);
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setConfigFile(configFile);
Expand Down
Loading

0 comments on commit 22c1386

Please sign in to comment.