Skip to content

Commit

Permalink
Merge pull request #48 from zilliztech/release_2.12.0
Browse files Browse the repository at this point in the history
release 2.12.0
  • Loading branch information
nianliuu authored Feb 5, 2025
2 parents 22c1386 + 2f27f04 commit 28f3469
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,20 @@ public void write(SeaTunnelRow element) {
String finalPartition = partition;
MilvusWriter batchWriter = batchWriters.computeIfAbsent(partitionId, id -> {
synchronized (batchWriters) {
if (!milvusClient.hasPartition(HasPartitionReq.builder()
.collectionName(catalogTable.getTablePath().getTableName())
.partitionName(finalPartition).build())) {
synchronized (milvusClient) {
milvusClient.createPartition(CreatePartitionReq.builder()
.collectionName(catalogTable.getTablePath().getTableName())
.partitionName(finalPartition).build());
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
if (!finalPartition.equals(DEFAULT_PARTITION)) {
Boolean hasPartition = milvusClient.hasPartition(HasPartitionReq.builder()
.collectionName(collection)
.partitionName(finalPartition).build());
if(!hasPartition) {
synchronized (milvusClient) {
milvusClient.createPartition(CreatePartitionReq.builder()
.collectionName(collection)
.partitionName(finalPartition).build());
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
throw new MilvusConnectorException(MilvusConnectionErrorCode.CREATE_PARTITION_ERROR, e);
}
}
}
}
Expand All @@ -148,7 +151,7 @@ public void write(SeaTunnelRow element) {
}
return useBulkWriter
? new MilvusBulkWriter(this.catalogTable, config, stageBucket, describeCollectionResp, finalPartition)
: new MilvusBufferBatchWriter(this.catalogTable, config, milvusClient, finalPartition);
: new MilvusBufferBatchWriter(this.catalogTable, config, milvusClient, describeCollectionResp, finalPartition);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@
@Slf4j
public class MilvusConnectorUtils {

public static Boolean hasPartitionKey(MilvusClientV2 milvusClient, String collectionName) {

DescribeCollectionResp describeCollectionResp =
milvusClient.describeCollection(
DescribeCollectionReq.builder().collectionName(collectionName).build());
public static Boolean hasPartitionKey(DescribeCollectionResp describeCollectionResp) {
return describeCollectionResp.getCollectionSchema().getFieldSchemaList().stream()
.anyMatch(CreateCollectionReq.FieldSchema::getIsPartitionKey);
}
Expand Down Expand Up @@ -68,4 +64,18 @@ public static List<String> getJsonField(CatalogTable catalogTable) {
}
return jsonColumn;
}

public static Boolean enableAutoId(MilvusClientV2 milvusClient, String collectionName) {
DescribeCollectionResp describeCollectionResp =
milvusClient.describeCollection(
DescribeCollectionReq.builder().collectionName(collectionName).build());
return describeCollectionResp.getAutoID();
}

public static Boolean enableDynamicSchema(MilvusClientV2 milvusClient, String collectionName) {
DescribeCollectionResp describeCollectionResp =
milvusClient.describeCollection(
DescribeCollectionReq.builder().collectionName(collectionName).build());
return describeCollectionResp.getEnableDynamicField();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.milvus.v2.service.collection.request.CreateCollectionReq;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
Expand All @@ -42,8 +41,6 @@
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
import static org.apache.seatunnel.connectors.seatunnel.milvus.sink.config.MilvusSinkConfig.ENABLE_AUTO_ID;
import static org.apache.seatunnel.connectors.seatunnel.milvus.sink.config.MilvusSinkConfig.ENABLE_DYNAMIC_FIELD;

import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -68,7 +65,6 @@ public Object convertBySeaTunnelType(
case SMALLINT:
return Short.parseShort(value.toString());
case STRING:
case DATE:
if (isJson) {
return gson.fromJson(value.toString(), JsonObject.class);
}
Expand All @@ -90,6 +86,10 @@ public Object convertBySeaTunnelType(
return Boolean.parseBoolean(value.toString());
case DOUBLE:
return Double.parseDouble(value.toString());
case TIMESTAMP:
case TIME:
case DATE:
return value.toString();
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
switch (arrayType.getElementType().getSqlType()) {
Expand Down Expand Up @@ -146,7 +146,9 @@ public static CreateCollectionReq.FieldSchema convertToFieldType(
fieldSchema.setMaxLength(65535);
break;
case DATE:
fieldSchema.setMaxLength(20);
case TIME:
case TIMESTAMP:
fieldSchema.setMaxLength(50);
break;
case STRING:
if (column.getOptions() != null
Expand Down Expand Up @@ -239,6 +241,8 @@ public static io.milvus.v2.common.DataType convertSqlTypeToDataType(SqlType sqlT
case SPARSE_FLOAT_VECTOR:
return io.milvus.v2.common.DataType.SparseFloatVector;
case DATE:
case TIME:
case TIMESTAMP:
return io.milvus.v2.common.DataType.VarChar;
case ROW:
return io.milvus.v2.common.DataType.VarChar;
Expand All @@ -249,14 +253,13 @@ public static io.milvus.v2.common.DataType convertSqlTypeToDataType(SqlType sqlT

public JsonObject buildMilvusData(
CatalogTable catalogTable,
ReadonlyConfig config,
Boolean autoId,
Boolean enableDynamicField,
List<String> jsonFields,
String dynamicField,
SeaTunnelRow element) {
SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
PrimaryKey primaryKey = catalogTable.getTableSchema().getPrimaryKey();
boolean autoId = config.get(ENABLE_AUTO_ID) != null && config.get(ENABLE_AUTO_ID);

JsonObject data = new JsonObject();
Gson gson = new Gson();
for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
Expand All @@ -276,7 +279,7 @@ public JsonObject buildMilvusData(
// if the field is dynamic field, then parse the dynamic field
if (dynamicField != null
&& dynamicField.equals(fieldName)
&& config.get(ENABLE_DYNAMIC_FIELD)) {
&& enableDynamicField) {
JsonObject dynamicData = gson.fromJson(value.toString(), JsonObject.class);
dynamicData
.entrySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.google.gson.JsonObject;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
import io.milvus.v2.service.vector.request.UpsertReq;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -45,8 +47,9 @@ public class MilvusBufferBatchWriter implements MilvusWriter {
private final String partitionName;
private final Boolean hasPartitionKey;

private final MilvusClientV2 milvusClient;
private final MilvusSinkConverter milvusSinkConverter;
private final DescribeCollectionResp descriptionCollectionResp;
private final MilvusClientV2 milvusClient;
private int batchSize;
private volatile List<JsonObject> milvusDataCache;
private final AtomicLong writeCache = new AtomicLong();
Expand All @@ -55,7 +58,9 @@ public class MilvusBufferBatchWriter implements MilvusWriter {
private final List<String> jsonFieldNames;
private final String dynamicFieldName;

public MilvusBufferBatchWriter (CatalogTable catalogTable, ReadonlyConfig config, MilvusClientV2 milvusClient, String partitionName)
public MilvusBufferBatchWriter (CatalogTable catalogTable, ReadonlyConfig config,
MilvusClientV2 milvusClient,
DescribeCollectionResp describeCollectionResp, String partitionName)
throws SeaTunnelException {
this.catalogTable = catalogTable;
this.config = config;
Expand All @@ -65,19 +70,20 @@ public MilvusBufferBatchWriter (CatalogTable catalogTable, ReadonlyConfig config

this.milvusDataCache = new ArrayList<>();
this.milvusSinkConverter = new MilvusSinkConverter();
this.milvusClient = milvusClient;

this.dynamicFieldName = MilvusConnectorUtils.getDynamicField(catalogTable);
this.jsonFieldNames = MilvusConnectorUtils.getJsonField(catalogTable);
this.hasPartitionKey = MilvusConnectorUtils.hasPartitionKey(milvusClient, collectionName);
this.milvusClient = milvusClient;
this.hasPartitionKey = MilvusConnectorUtils.hasPartitionKey(describeCollectionResp);
this.descriptionCollectionResp = describeCollectionResp;
}

@Override
public void write(SeaTunnelRow element) {
// put data to cache by partition
JsonObject data =
milvusSinkConverter.buildMilvusData(
catalogTable, config, jsonFieldNames, dynamicFieldName, element);
catalogTable, descriptionCollectionResp.getAutoID(), descriptionCollectionResp.getEnableDynamicField(), jsonFieldNames, dynamicFieldName, element);
milvusDataCache.add(data);
writeCache.incrementAndGet();
writeCount.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

@Slf4j
public class MilvusBulkWriter implements MilvusWriter {
private final DescribeCollectionResp describeCollectionResp;
RemoteBulkWriter remoteBulkWriter;
MilvusImport milvusImport;

Expand All @@ -53,6 +54,7 @@ public MilvusBulkWriter(CatalogTable catalogTable, ReadonlyConfig config, StageB
this.milvusSinkConverter = new MilvusSinkConverter();
this.dynamicFieldName = MilvusConnectorUtils.getDynamicField(catalogTable);
this.jsonFieldNames = MilvusConnectorUtils.getJsonField(catalogTable);
this.describeCollectionResp = describeCollectionResp;
String collectionName = catalogTable.getTablePath().getTableName();
StorageConnectParam storageConnectParam;
if(Objects.equals(stageBucket.getCloudId(), "az")){
Expand Down Expand Up @@ -93,7 +95,7 @@ public MilvusBulkWriter(CatalogTable catalogTable, ReadonlyConfig config, StageB
@Override
public void write(SeaTunnelRow element) throws IOException, InterruptedException {
JsonObject data = milvusSinkConverter.buildMilvusData(
catalogTable, config, jsonFieldNames, dynamicFieldName, element);
catalogTable, describeCollectionResp.getAutoID(), describeCollectionResp.getEnableDynamicField(), jsonFieldNames, dynamicFieldName, element);

remoteBulkWriter.appendRow(data);
writeCache.set(remoteBulkWriter.getBufferRowCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
import org.apache.seatunnel.connectors.seatunnel.milvus.sink.common.ControllerAPI;
import org.apache.seatunnel.connectors.seatunnel.milvus.sink.common.StageBucket;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class MilvusImport {
Expand All @@ -33,7 +32,7 @@ public class MilvusImport {
private final String partitionName;
private final String apiKey;
private final StageBucket stageBucket;
private HashMap<String, String> objectUrls = new HashMap<>();
private ConcurrentHashMap<String, String> objectUrlsMap = new ConcurrentHashMap<>();
public MilvusImport(String url, String dbName, String collectionName, String partitionName, StageBucket stageBucket) {
this.stageBucket = stageBucket;
this.clusterId = stageBucket.getInstanceId();
Expand All @@ -60,7 +59,7 @@ private String processUrl(String path) {
}

public void importData(String objectUrl) {
if(objectUrls.containsKey(objectUrl)) {
if(objectUrlsMap.containsKey(objectUrl)) {
return;
}
String objectUrlStr = processUrl(objectUrl);
Expand All @@ -86,7 +85,7 @@ public void importData(String objectUrl) {

BulkImportResponse importResponse = importToCloud(baseUrl, importRequest);

objectUrls.put(objectUrl, importResponse.getJobId());
objectUrlsMap.put(objectUrl, importResponse.getJobId());
log.info("import objectUrl: " + objectUrl + " success");
}

Expand All @@ -101,7 +100,7 @@ public void waitImportFinish() {
log.info("all import job finish");
}
public boolean checkImportFinish() {
HashSet<String> jobIds = new HashSet<>(objectUrls.values());
HashSet<String> jobIds = new HashSet<>(objectUrlsMap.values());
for(String jobId : jobIds) {
log.info("wait import job: " + jobId + " finish");
CloudDescribeImportRequest importProgress = CloudDescribeImportRequest.builder()
Expand Down

0 comments on commit 28f3469

Please sign in to comment.