From a67630228d79ea7b28f6051b1e6e36636c495471 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 11 Dec 2023 16:32:29 +0800 Subject: [PATCH 1/7] add arrow type --- flink-doris-connector/pom.xml | 9 +- .../flink/sink/batch/BatchRecordBuffer.java | 52 +++-- .../sink/batch/DorisBatchStreamLoad.java | 129 +++++------ .../flink/sink/batch/DorisBatchWriter.java | 75 +++---- .../flink/sink/writer/DorisStreamLoad.java | 111 ++++------ .../doris/flink/sink/writer/DorisWriter.java | 203 ++++++++---------- .../flink/sink/writer/LoadConstants.java | 1 + .../sink/writer/serializer/DorisRecord.java | 3 + .../serializer/DorisRecordSerializer.java | 10 +- .../writer/serializer/RowDataSerializer.java | 101 +++++++-- .../arrow/serializers/ArrowSerializer.java | 153 +++++++++++++ .../sink/writer/TestRowDataSerializer.java | 93 ++++---- 12 files changed, 556 insertions(+), 384 deletions(-) create mode 100644 flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index 76156adf7..a2550e13a 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -72,7 +72,7 @@ under the License. 1.18 2.4.2 0.16.0 - 5.0.0 + 13.0.0 3.10.1 3.3.0 3.2.1 @@ -137,6 +137,13 @@ under the License. provided + + org.apache.flink + flink-python + ${flink.version} + provided + + org.apache.thrift libthrift diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 297cb18cf..339ec8494 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -18,13 +18,15 @@ package org.apache.doris.flink.sink.batch; import org.apache.flink.annotation.VisibleForTesting; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -/** buffer to queue. */ + +/** + * buffer to queue + */ public class BatchRecordBuffer { private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class); public static final String LINE_SEPARATOR = "\n"; @@ -37,7 +39,7 @@ public class BatchRecordBuffer { private String database; private String table; - public BatchRecordBuffer() {} + public BatchRecordBuffer(){} public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) { super(); @@ -55,9 +57,9 @@ public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, in public void insert(byte[] record) { ensureCapacity(record.length); - if (loadBatchFirstRecord) { + if(loadBatchFirstRecord) { loadBatchFirstRecord = false; - } else { + } else if (lineDelimiter != null) { this.buffer.put(this.lineDelimiter); } this.buffer.put(record); @@ -67,8 +69,8 @@ public void insert(byte[] record) { @VisibleForTesting public void ensureCapacity(int length) { - int lineDelimiterSize = this.lineDelimiter.length; - if (buffer.remaining() - lineDelimiterSize >= length) { + int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length; + if(buffer.remaining() - lineDelimiterSize >= length){ return; } int currentRemain = buffer.remaining(); @@ -85,12 +87,7 @@ public void ensureCapacity(int length) { tmp.put(buffer); buffer.clear(); buffer = tmp; - LOG.info( - "record length {},buffer remain {} ,grow capacity {} to {}", - length, - currentRemain, - currentCapacity, - newCapacity); + LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", length, currentRemain, currentCapacity, newCapacity); } public String getLabelName() { @@ -101,19 +98,21 @@ public void setLabelName(String labelName) { this.labelName = labelName; } - /** @return true if buffer is empty */ + /** + * @return true if buffer is empty + */ public boolean isEmpty() { return numOfRecords == 0; } public ByteBuffer getData() { - // change mode + //change mode buffer.flip(); - LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(), getBufferSizeBytes()); + LOG.debug("flush buffer: {} records, {} bytes",getNumOfRecords(),getBufferSizeBytes()); return buffer; } - public void clear() { + public void clear(){ this.buffer.clear(); this.numOfRecords = 0; this.bufferSizeBytes = 0; @@ -121,26 +120,33 @@ public void clear() { this.loadBatchFirstRecord = true; } - public ByteBuffer getBuffer() { + public ByteBuffer getBuffer(){ return buffer; } - - /** @return Number of records in this buffer */ + /** + * @return Number of records in this buffer + */ public int getNumOfRecords() { return numOfRecords; } - /** @return Buffer size in bytes */ + /** + * @return Buffer size in bytes + */ public int getBufferSizeBytes() { return bufferSizeBytes; } - /** @param numOfRecords Updates number of records (Usually by 1) */ + /** + * @param numOfRecords Updates number of records (Usually by 1) + */ public void setNumOfRecords(int numOfRecords) { this.numOfRecords = numOfRecords; } - /** @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes) */ + /** + * @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes) + */ public void setBufferSizeBytes(int bufferSizeBytes) { this.bufferSizeBytes = bufferSizeBytes; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 7ca0cda71..e968406f5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -17,8 +17,6 @@ package org.apache.doris.flink.sink.batch; -import org.apache.flink.util.Preconditions; - import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -32,6 +30,7 @@ import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; +import org.apache.flink.util.Preconditions; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -61,16 +60,20 @@ import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; +import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; -/** async stream load. */ +/** + * async stream load + **/ public class DorisBatchStreamLoad implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(DorisBatchStreamLoad.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final List DORIS_SUCCESS_STATUS = - new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); + private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; @@ -90,78 +93,54 @@ public class DorisBatchStreamLoad implements Serializable { private CloseableHttpClient httpClient = new HttpUtil().getHttpClient(); private BackendUtil backendUtil; - public DorisBatchStreamLoad( - DorisOptions dorisOptions, - DorisReadOptions dorisReadOptions, - DorisExecutionOptions executionOptions, - LabelGenerator labelGenerator) { - this.backendUtil = - StringUtils.isNotEmpty(dorisOptions.getBenodes()) - ? new BackendUtil(dorisOptions.getBenodes()) - : new BackendUtil( - RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); + public DorisBatchStreamLoad(DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions, + LabelGenerator labelGenerator) { + this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil( + dorisOptions.getBenodes()) + : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); this.hostPort = backendUtil.getAvailableBackend(); this.username = dorisOptions.getUsername(); this.password = dorisOptions.getPassword(); this.loadProps = executionOptions.getStreamLoadProp(); this.labelGenerator = labelGenerator; - this.lineDelimiter = - EscapeHandler.escapeString( - loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) - .getBytes(); + if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) { + this.lineDelimiter = null; + } else { + this.lineDelimiter = EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes(); + } this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); - if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { + if(StringUtils.isNotBlank(dorisOptions.getTableIdentifier())){ String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); - Preconditions.checkState( - tableInfo.length == 2, - "tableIdentifier input error, the format is database.table"); + Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table"); this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]); } - this.loadAsyncExecutor = new LoadAsyncExecutor(); - this.loadExecutorService = - new ThreadPoolExecutor( - 1, - 1, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(1), - new DefaultThreadFactory("streamload-executor"), - new ThreadPoolExecutor.AbortPolicy()); + this.loadAsyncExecutor= new LoadAsyncExecutor(); + this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy()); this.started = new AtomicBoolean(true); this.loadExecutorService.execute(loadAsyncExecutor); } /** * write record into cache. - * * @param record * @throws IOException */ - public synchronized void writeRecord(String database, String table, byte[] record) - throws InterruptedException { + public synchronized void writeRecord(String database, String table, byte[] record) throws InterruptedException { checkFlushException(); String bufferKey = getTableIdentifier(database, table); - BatchRecordBuffer buffer = - bufferMap.computeIfAbsent( - bufferKey, - k -> - new BatchRecordBuffer( - database, - table, - this.lineDelimiter, - executionOptions.getBufferFlushMaxBytes())); + BatchRecordBuffer buffer = bufferMap.computeIfAbsent(bufferKey, k -> new BatchRecordBuffer(database, table, this.lineDelimiter, executionOptions.getBufferFlushMaxBytes())); buffer.insert(record); - // When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion + //When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8 - || (executionOptions.getBufferFlushMaxRows() != 0 - && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { - flush(bufferKey, false); + || (executionOptions.getBufferFlushMaxRows() != 0 && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { + flush(bufferKey,false); } } - public synchronized void flush(String bufferKey, boolean waitUtilDone) - throws InterruptedException { + public synchronized void flush(String bufferKey, boolean waitUtilDone) throws InterruptedException { checkFlushException(); if (null == bufferKey) { for (String key : bufferMap.keySet()) { @@ -183,9 +162,9 @@ private synchronized void flushBuffer(String bufferKey) { bufferMap.remove(bufferKey); } - private void putRecordToFlushQueue(BatchRecordBuffer buffer) { + private void putRecordToFlushQueue(BatchRecordBuffer buffer){ checkFlushException(); - if (!loadThreadAlive) { + if(!loadThreadAlive){ throw new RuntimeException("load thread already exit, write was interrupted"); } try { @@ -202,7 +181,7 @@ private void checkFlushException() { } private void waitAsyncLoadFinish() { - for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) { + for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){ BatchRecordBuffer empty = new BatchRecordBuffer(); putRecordToFlushQueue(empty); } @@ -212,11 +191,11 @@ private String getTableIdentifier(String database, String table) { return database + "." + table; } - public void close() { - // close async executor + public void close(){ + //close async executor this.loadExecutorService.shutdown(); this.started.set(false); - // clear buffer + //clear buffer this.flushQueue.clear(); } @@ -229,7 +208,7 @@ public void run() { BatchRecordBuffer buffer = null; try { buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); - if (buffer == null) { + if(buffer == null){ continue; } if (buffer.getLabelName() != null) { @@ -238,7 +217,7 @@ public void run() { } catch (Exception e) { LOG.error("worker running error", e); exception.set(e); - // clear queue to avoid writer thread blocking + //clear queue to avoid writer thread blocking flushQueue.clear(); break; } @@ -247,15 +226,15 @@ public void run() { loadThreadAlive = false; } - /** execute stream load. */ - public void load(String label, BatchRecordBuffer buffer) throws IOException { + /** + * execute stream load + */ + public void load(String label, BatchRecordBuffer buffer) throws IOException{ refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); ByteBuffer data = buffer.getData(); - ByteArrayEntity entity = - new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit()); + ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit()); HttpPutBuilder putBuilder = new HttpPutBuilder(); - putBuilder - .setUrl(loadUrl) + putBuilder.setUrl(loadUrl) .baseAuth(username, password) .setLabel(label) .addCommonHeader() @@ -271,27 +250,21 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { if (statusCode == 200 && response.getEntity() != null) { String loadResult = EntityUtils.toString(response.getEntity()); LOG.info("load Result {}", loadResult); - RespContent respContent = - OBJECT_MAPPER.readValue(loadResult, RespContent.class); + RespContent respContent = OBJECT_MAPPER.readValue(loadResult, RespContent.class); if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - String errMsg = - String.format( - "stream load error: %s, see more in %s", - respContent.getMessage(), respContent.getErrorURL()); + String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL()); throw new DorisBatchLoadException(errMsg); - } else { + }else{ return; } } - LOG.error( - "stream load failed with {}, reason {}, to retry", - hostPort, - response.getStatusLine().toString()); - } catch (Exception ex) { + LOG.error("stream load failed with {}, reason {}, to retry", hostPort, response.getStatusLine().toString()); + }catch (Exception ex){ if (retry == executionOptions.getMaxRetries()) { throw new DorisBatchLoadException("stream load error: ", ex); } LOG.error("stream load error with {}, to retry, cause by", hostPort, ex); + } retry++; // get available backend retry @@ -302,7 +275,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { buffer = null; } - private void refreshLoadUrl(String database, String table) { + private void refreshLoadUrl(String database, String table){ hostPort = backendUtil.getAvailableBackend(); loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index 43aff7fd9..9dc2b936c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -17,18 +17,17 @@ package org.apache.doris.flink.sink.batch; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; - import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,41 +51,33 @@ public class DorisBatchWriter implements SinkWriter { private String database; private String table; - public DorisBatchWriter( - Sink.InitContext initContext, - DorisRecordSerializer serializer, - DorisOptions dorisOptions, - DorisReadOptions dorisReadOptions, - DorisExecutionOptions executionOptions) { - if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) { + public DorisBatchWriter(Sink.InitContext initContext, + DorisRecordSerializer serializer, + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions) { + if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){ String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); - Preconditions.checkState( - tableInfo.length == 2, - "tableIdentifier input error, the format is database.table"); + Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table"); this.database = tableInfo[0]; this.table = tableInfo[1]; } LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId(); this.labelGenerator = new LabelGenerator(labelPrefix, false); - this.scheduledExecutorService = - new ScheduledThreadPoolExecutor( - 1, new ExecutorThreadFactory("stream-load-flush-interval")); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-flush-interval")); this.serializer = serializer; this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; this.executionOptions = executionOptions; this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs(); + serializer.initial(); } public void initializeLoad() throws IOException { - this.batchStreamLoad = - new DorisBatchStreamLoad( - dorisOptions, dorisReadOptions, executionOptions, labelGenerator); - // when uploading data in streaming mode, - // we need to regularly detect whether there areexceptions. - scheduledExecutorService.scheduleWithFixedDelay( - this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); + this.batchStreamLoad = new DorisBatchStreamLoad(dorisOptions, dorisReadOptions, executionOptions, labelGenerator); + // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. + scheduledExecutorService.scheduleWithFixedDelay(this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); } private void intervalFlush() { @@ -101,26 +92,30 @@ private void intervalFlush() { @Override public void write(IN in, Context context) throws IOException, InterruptedException { checkFlushException(); - String db = this.database; - String tbl = this.table; - DorisRecord record = serializer.serialize(in); - if (record == null || record.getRow() == null) { - // ddl or value is null - return; - } - // multi table load - if (record.getTableIdentifier() != null) { - db = record.getDatabase(); - tbl = record.getTable(); - } - batchStreamLoad.writeRecord(db, tbl, record.getRow()); + writeOneDorisRecord(serializer.serialize(in)); } @Override public void flush(boolean flush) throws IOException, InterruptedException { checkFlushException(); + writeOneDorisRecord(serializer.flush()); LOG.info("checkpoint flush triggered."); - batchStreamLoad.flush(null, true); + batchStreamLoad.flush(null, true); + } + + public void writeOneDorisRecord(DorisRecord record) throws InterruptedException { + if(record == null || record.getRow() == null){ + //ddl or value is null + return; + } + String db = this.database; + String tbl = this.table; + //multi table load + if(record.getTableIdentifier() != null){ + db = record.getDatabase(); + tbl = record.getTable(); + } + batchStreamLoad.writeRecord(db, tbl, record.getRow()); } @Override diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index 30ff365d6..bb575d871 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -17,10 +17,6 @@ package org.apache.doris.flink.sink.writer; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; - import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -32,6 +28,9 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.ResponseUtil; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -54,10 +53,15 @@ import static org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST; import static org.apache.doris.flink.sink.LoadStatus.SUCCESS; import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN; +import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; +import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; +import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; -/** load data to doris. */ +/** + * load data to doris. + **/ public class DorisStreamLoad implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -83,12 +87,11 @@ public class DorisStreamLoad implements Serializable { private final ExecutorService executorService; private boolean loadBatchFirstRecord; - public DorisStreamLoad( - String hostPort, - DorisOptions dorisOptions, - DorisExecutionOptions executionOptions, - LabelGenerator labelGenerator, - CloseableHttpClient httpClient) { + public DorisStreamLoad(String hostPort, + DorisOptions dorisOptions, + DorisExecutionOptions executionOptions, + LabelGenerator labelGenerator, + CloseableHttpClient httpClient) { this.hostPort = hostPort; String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); this.db = tableInfo[0]; @@ -102,24 +105,15 @@ public DorisStreamLoad( this.streamLoadProp = executionOptions.getStreamLoadProp(); this.enableDelete = executionOptions.getDeletable(); this.httpClient = httpClient; - this.executorService = - new ThreadPoolExecutor( - 1, - 1, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), - new ExecutorThreadFactory("stream-load-upload")); - this.recordStream = - new RecordStream( - executionOptions.getBufferSize(), - executionOptions.getBufferCount(), - executionOptions.isUseCache()); - lineDelimiter = - EscapeHandler.escapeString( - streamLoadProp.getProperty( - LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) - .getBytes(); + this.executorService = new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), new ExecutorThreadFactory("stream-load-upload")); + this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount(), executionOptions.isUseCache()); + if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) { + lineDelimiter = null; + } else { + lineDelimiter = EscapeHandler.escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes(); + } loadBatchFirstRecord = true; } @@ -147,7 +141,6 @@ public Future getPendingLoadFuture() { /** * try to discard pending transactions with labels beginning with labelSuffix. - * * @param labelSuffix * @param chkID * @throws Exception @@ -157,8 +150,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); while (true) { try { - // TODO: According to label abort txn. Currently, - // it can only be aborted based on txnid, + // TODO: According to label abort txn. Currently, it can only be aborted based on txnid, // so we must first request a streamload based on the label to get the txnid. String label = labelGenerator.generateTableLabel(startChkID); HttpPutBuilder builder = new HttpPutBuilder(); @@ -169,20 +161,17 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { .setLabel(label) .setEmptyEntity() .addProperties(streamLoadProp); - RespContent respContent = - handlePreCommitResponse(httpClient.execute(builder.build())); + RespContent respContent = handlePreCommitResponse(httpClient.execute(builder.build())); Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit())); if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) { // label already exist and job finished if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) { - throw new DorisException( - "Load status is " - + LABEL_ALREADY_EXIST - + " and load job finished, " - + "change you label prefix or restore from latest savepoint!"); + throw new DorisException("Load status is " + LABEL_ALREADY_EXIST + " and load job finished, " + + "change you label prefix or restore from latest savepoint!"); + } // job not finished, abort. - Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage()); + Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage()); if (matcher.find()) { Preconditions.checkState(label.equals(matcher.group(1))); long txnId = Long.parseLong(matcher.group(2)); @@ -190,10 +179,7 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { abortTransaction(txnId); } else { LOG.error("response: {}", respContent.toString()); - throw new DorisException( - "Load Status is " - + LABEL_ALREADY_EXIST - + ", but no txnID associated with it!"); + throw new DorisException("Load Status is " + LABEL_ALREADY_EXIST + ", but no txnID associated with it!"); } } else { LOG.info("abort {} for check label {}.", respContent.getTxnId(), label); @@ -211,14 +197,13 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { /** * write record into stream. - * * @param record * @throws IOException */ - public void writeRecord(byte[] record) throws IOException { + public void writeRecord(byte[] record) throws IOException{ if (loadBatchFirstRecord) { loadBatchFirstRecord = false; - } else { + } else if (lineDelimiter != null) { recordStream.write(lineDelimiter); } recordStream.write(record); @@ -229,7 +214,7 @@ public RecordStream getRecordStream() { return recordStream; } - public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception { + public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception{ final int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == 200 && response.getEntity() != null) { String loadResult = EntityUtils.toString(response.getEntity()); @@ -239,12 +224,12 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw throw new StreamLoadException("stream load error: " + response.getStatusLine().toString()); } - public RespContent stopLoad(String label) throws IOException { + public RespContent stopLoad(String label) throws IOException{ recordStream.endInput(); LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort); Preconditions.checkState(pendingLoadFuture != null); try { - return handlePreCommitResponse(pendingLoadFuture.get()); + return handlePreCommitResponse(pendingLoadFuture.get()); } catch (Exception e) { throw new DorisRuntimeException(e); } @@ -252,7 +237,6 @@ public RespContent stopLoad(String label) throws IOException { /** * start write data for new checkpoint. - * * @param label * @throws IOException */ @@ -263,8 +247,7 @@ public void startLoad(String label, boolean isResume) throws IOException { LOG.info("table {} stream load started for {} on host {}", table, label, hostPort); try { InputStreamEntity entity = new InputStreamEntity(recordStream); - putBuilder - .setUrl(loadUrlStr) + putBuilder.setUrl(loadUrlStr) .baseAuth(user, passwd) .addCommonHeader() .addHiddenColumns(enableDelete) @@ -272,14 +255,12 @@ public void startLoad(String label, boolean isResume) throws IOException { .setEntity(entity) .addProperties(streamLoadProp); if (enable2PC) { - putBuilder.enable2PC(); + putBuilder.enable2PC(); } - pendingLoadFuture = - executorService.submit( - () -> { - LOG.info("table {} start execute load", table); - return httpClient.execute(putBuilder.build()); - }); + pendingLoadFuture = executorService.submit(() -> { + LOG.info("table {} start execute load", table); + return httpClient.execute(putBuilder.build()); + }); } catch (Exception e) { String err = "failed to stream load data with label: " + label; LOG.warn(err, e); @@ -300,18 +281,16 @@ public void abortTransaction(long txnID) throws Exception { int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != 200 || response.getEntity() == null) { LOG.warn("abort transaction response: " + response.getStatusLine().toString()); - throw new DorisRuntimeException( - "Fail to abort transaction " + txnID + " with url " + abortUrlStr); + throw new DorisRuntimeException("Fail to abort transaction " + txnID + " with url " + abortUrlStr); } ObjectMapper mapper = new ObjectMapper(); String loadResult = EntityUtils.toString(response.getEntity()); - Map res = - mapper.readValue(loadResult, new TypeReference>() {}); + Map res = mapper.readValue(loadResult, new TypeReference>(){}); if (!SUCCESS.equals(res.get("status"))) { if (ResponseUtil.isCommitted(res.get("msg"))) { - throw new DorisException( - "try abort committed transaction, " + "do you recover from old savepoint?"); + throw new DorisException("try abort committed transaction, " + + "do you recover from old savepoint?"); } LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnID, res.get("msg")); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 3fec9414d..b588542dd 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -17,13 +17,6 @@ package org.apache.doris.flink.sink.writer; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.StatefulSink; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; -import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; - import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; @@ -36,6 +29,12 @@ import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,19 +55,16 @@ /** * Doris Writer will load data to doris. - * * @param */ -public class DorisWriter - implements StatefulSink.StatefulSinkWriter, - TwoPhaseCommittingSink.PrecommittingSinkWriter { +public class DorisWriter implements StatefulSink.StatefulSinkWriter, + TwoPhaseCommittingSink.PrecommittingSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class); - private static final List DORIS_SUCCESS_STATUS = - new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); + private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final long lastCheckpointId; private long curCheckpointId; private Map dorisStreamLoadMap = new ConcurrentHashMap<>(); - private Map labelGeneratorMap = new ConcurrentHashMap<>(); + private Map labelGeneratorMap = new ConcurrentHashMap<>();; volatile boolean globalLoading; private Map loadingMap = new ConcurrentHashMap<>(); private final DorisOptions dorisOptions; @@ -83,13 +79,12 @@ public class DorisWriter private transient volatile Exception loadException = null; private BackendUtil backendUtil; - public DorisWriter( - Sink.InitContext initContext, - Collection state, - DorisRecordSerializer serializer, - DorisOptions dorisOptions, - DorisReadOptions dorisReadOptions, - DorisExecutionOptions executionOptions) { + public DorisWriter(Sink.InitContext initContext, + Collection state, + DorisRecordSerializer serializer, + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions) { this.lastCheckpointId = initContext .getRestoredCheckpointId() @@ -99,8 +94,7 @@ public DorisWriter( LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); this.labelPrefix = executionOptions.getLabelPrefix(); this.subtaskId = initContext.getSubtaskId(); - this.scheduledExecutorService = - new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check")); + this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check")); this.serializer = serializer; this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; @@ -109,12 +103,13 @@ public DorisWriter( this.globalLoading = false; initializeLoad(state); + serializer.initial(); } public void initializeLoad(Collection state) { this.backendUtil = BackendUtil.getInstance(dorisOptions, dorisReadOptions, LOG); try { - if (executionOptions.enabled2PC()) { + if(executionOptions.enabled2PC()) { abortLingeringTransactions(state); } } catch (Exception e) { @@ -123,42 +118,35 @@ public void initializeLoad(Collection state) { } // get main work thread. executorThread = Thread.currentThread(); - // when uploading data in streaming mode, - // we need to regularly detect whether there are exceptions. - scheduledExecutorService.scheduleWithFixedDelay( - this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); + // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. + scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); } - private void abortLingeringTransactions(Collection recoveredStates) - throws Exception { + private void abortLingeringTransactions(Collection recoveredStates) throws Exception { List alreadyAborts = new ArrayList<>(); - // abort label in state - for (DorisWriterState state : recoveredStates) { - // Todo: When the sink parallelism is reduced, - // the txn of the redundant task before aborting is also needed. - if (!state.getLabelPrefix().equals(labelPrefix)) { - LOG.warn( - "Label prefix from previous execution {} has changed to {}.", - state.getLabelPrefix(), - executionOptions.getLabelPrefix()); - } - if (state.getDatabase() == null || state.getTable() == null) { - LOG.warn( - "Transactions cannot be aborted when restore because the last used flink-doris-connector version less than 1.5.0."); - continue; - } - String key = state.getDatabase() + "." + state.getTable(); - DorisStreamLoad streamLoader = getStreamLoader(key); - streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId); - alreadyAborts.add(state.getLabelPrefix()); + //abort label in state + for(DorisWriterState state : recoveredStates){ + // Todo: When the sink parallelism is reduced, + // the txn of the redundant task before aborting is also needed. + if(!state.getLabelPrefix().equals(labelPrefix)){ + LOG.warn("Label prefix from previous execution {} has changed to {}.", state.getLabelPrefix(), executionOptions.getLabelPrefix()); + } + if (state.getDatabase() == null || state.getTable() == null) { + LOG.warn("Transactions cannot be aborted when restore because the last used flink-doris-connector version less than 1.5.0."); + continue; + } + String key = state.getDatabase() + "." + state.getTable(); + DorisStreamLoad streamLoader = getStreamLoader(key); + streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId); + alreadyAborts.add(state.getLabelPrefix()); } // TODO: In a multi-table scenario, if do not restore from checkpoint, // when modify labelPrefix at startup, we cannot abort the previous label. - if (!alreadyAborts.contains(labelPrefix) + if(!alreadyAborts.contains(labelPrefix) && StringUtils.isNotEmpty(dorisOptions.getTableIdentifier()) - && StringUtils.isNotEmpty(labelPrefix)) { - // abort current labelPrefix + && StringUtils.isNotEmpty(labelPrefix)){ + //abort current labelPrefix DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier()); streamLoader.abortPreCommit(labelPrefix, curCheckpointId); } @@ -167,20 +155,29 @@ private void abortLingeringTransactions(Collection recoveredSt @Override public void write(IN in, Context context) throws IOException { checkLoadException(); - String tableKey = dorisOptions.getTableIdentifier(); + writeOneDorisRecord(serializer.serialize(in)); + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + writeOneDorisRecord(serializer.flush()); + } - DorisRecord record = serializer.serialize(in); - if (record == null || record.getRow() == null) { - // ddl or value is null + public void writeOneDorisRecord(DorisRecord record) throws IOException { + + if(record == null || record.getRow() == null){ + //ddl or value is null return; } - // multi table load - if (record.getTableIdentifier() != null) { + + //multi table load + String tableKey = dorisOptions.getTableIdentifier(); + if(record.getTableIdentifier() != null){ tableKey = record.getTableIdentifier(); } DorisStreamLoad streamLoader = getStreamLoader(tableKey); - if (!loadingMap.containsKey(tableKey)) { + if(!loadingMap.containsKey(tableKey)) { // start stream load only when there has data LabelGenerator labelGenerator = getLabelGenerator(tableKey); String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); @@ -191,15 +188,10 @@ public void write(IN in, Context context) throws IOException { streamLoader.writeRecord(record.getRow()); } - @Override - public void flush(boolean flush) throws IOException, InterruptedException { - // No action is triggered, everything is in the precommit method - } - @Override public Collection prepareCommit() throws IOException, InterruptedException { // Verify whether data is written during a checkpoint - if (!globalLoading && loadingMap.values().stream().noneMatch(Boolean::booleanValue)) { + if(!globalLoading && loadingMap.values().stream().noneMatch(Boolean::booleanValue)){ return Collections.emptyList(); } // disable exception checker before stop load. @@ -207,9 +199,9 @@ public Collection prepareCommit() throws IOException, Interrup // submit stream load http request List committableList = new ArrayList<>(); - for (Map.Entry streamLoader : dorisStreamLoadMap.entrySet()) { + for(Map.Entry streamLoader : dorisStreamLoadMap.entrySet()){ String tableIdentifier = streamLoader.getKey(); - if (!loadingMap.getOrDefault(tableIdentifier, false)) { + if(!loadingMap.getOrDefault(tableIdentifier, false)){ LOG.debug("skip table {}, no data need to load.", tableIdentifier); continue; } @@ -218,19 +210,12 @@ public Collection prepareCommit() throws IOException, Interrup String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); RespContent respContent = dorisStreamLoad.stopLoad(currentLabel); if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - String errMsg = - String.format( - "tabel {} stream load error: %s, see more in %s", - tableIdentifier, - respContent.getMessage(), - respContent.getErrorURL()); + String errMsg = String.format("tabel {} stream load error: %s, see more in %s", tableIdentifier, respContent.getMessage(), respContent.getErrorURL()); throw new DorisRuntimeException(errMsg); } - if (executionOptions.enabled2PC()) { + if(executionOptions.enabled2PC()){ long txnId = respContent.getTxnId(); - committableList.add( - new DorisCommittable( - dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); + committableList.add(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); } } // clean loadingMap @@ -241,51 +226,40 @@ public Collection prepareCommit() throws IOException, Interrup @Override public List snapshotState(long checkpointId) throws IOException { List writerStates = new ArrayList<>(); - for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) { - // Dynamic refresh backend + for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){ + //Dynamic refresh backend dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); - DorisWriterState writerState = - new DorisWriterState( - labelPrefix, - dorisStreamLoad.getDb(), - dorisStreamLoad.getTable(), - subtaskId); + DorisWriterState writerState = new DorisWriterState(labelPrefix, dorisStreamLoad.getDb(), dorisStreamLoad.getTable(), subtaskId); writerStates.add(writerState); } this.curCheckpointId = checkpointId + 1; return writerStates; } - private LabelGenerator getLabelGenerator(String tableKey) { - return labelGeneratorMap.computeIfAbsent( - tableKey, - v -> - new LabelGenerator( - labelPrefix, executionOptions.enabled2PC(), tableKey, subtaskId)); + private LabelGenerator getLabelGenerator(String tableKey){ + return labelGeneratorMap.computeIfAbsent(tableKey, v-> new LabelGenerator(labelPrefix, executionOptions.enabled2PC(), tableKey, subtaskId)); } - private DorisStreamLoad getStreamLoader(String tableKey) { + private DorisStreamLoad getStreamLoader(String tableKey){ LabelGenerator labelGenerator = getLabelGenerator(tableKey); dorisOptions.setTableIdentifier(tableKey); - return dorisStreamLoadMap.computeIfAbsent( - tableKey, - v -> - new DorisStreamLoad( - backendUtil.getAvailableBackend(), - dorisOptions, - executionOptions, - labelGenerator, - new HttpUtil().getHttpClient())); + return dorisStreamLoadMap.computeIfAbsent(tableKey, v -> new DorisStreamLoad(backendUtil.getAvailableBackend(), + dorisOptions, + executionOptions, + labelGenerator, + new HttpUtil().getHttpClient())); } - /** Check the streamload http request regularly. */ + /** + * Check the streamload http request regularly + */ private void checkDone() { - for (Map.Entry streamLoadMap : dorisStreamLoadMap.entrySet()) { + for(Map.Entry streamLoadMap : dorisStreamLoadMap.entrySet()){ checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue()); } } - private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad) { + private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad){ // the load future is done and checked in prepareCommit(). // this will check error while loading. LOG.debug("start timer checker, interval {} ms", intervalTime); @@ -309,32 +283,23 @@ private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoa dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); } // start a new txn(stream load) - LOG.info( - "getting exception, breakpoint resume for checkpoint ID: {}, table {}", - curCheckpointId, - tableIdentifier); + LOG.info("getting exception, breakpoint resume for checkpoint ID: {}, table {}", curCheckpointId, tableIdentifier); LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier); - dorisStreamLoad.startLoad( - labelGenerator.generateTableLabel(curCheckpointId), true); + dorisStreamLoad.startLoad(labelGenerator.generateTableLabel(curCheckpointId), true); } catch (Exception e) { throw new DorisRuntimeException(e); } } else { String errorMsg; try { - RespContent content = - dorisStreamLoad.handlePreCommitResponse( - dorisStreamLoad.getPendingLoadFuture().get()); + RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get()); errorMsg = content.getMessage(); } catch (Exception e) { errorMsg = e.getMessage(); } loadException = new StreamLoadException(errorMsg); - LOG.error( - "table {} stream load finished unexpectedly, interrupt worker thread! {}", - tableIdentifier, - errorMsg); + LOG.error("table {} stream load finished unexpectedly, interrupt worker thread! {}", tableIdentifier, errorMsg); // set the executor thread interrupted in case blocking in write data. executorThread.interrupt(); } @@ -365,9 +330,11 @@ public void close() throws Exception { scheduledExecutorService.shutdownNow(); } if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) { - for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) { + for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){ dorisStreamLoad.close(); } } + serializer.close(); } + } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java index 7b0d1d07a..2e5d29a50 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java @@ -27,6 +27,7 @@ public class LoadConstants { public static final String FORMAT_KEY = "format"; public static final String JSON = "json"; public static final String CSV = "csv"; + public static final String ARROW = "arrow"; public static final String NULL_VALUE = "\\N"; public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__"; public static final String READ_JSON_BY_LINE = "read_json_by_line"; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java index d15f07c5b..9de4c958f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java @@ -20,6 +20,9 @@ import java.io.Serializable; public class DorisRecord implements Serializable { + + public static DorisRecord EMPTY = new DorisRecord(); + private String database; private String table; private byte[] row; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java index c1135fad8..e450cebb8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java @@ -22,17 +22,23 @@ /** * How to serialize the record to bytes. - * * @param */ public interface DorisRecordSerializer extends Serializable { /** * define how to convert record into byte array. - * * @param record * @return [tableIdentifer,byte array] * @throws IOException */ DorisRecord serialize(T record) throws IOException; + + default void initial() {} + + default DorisRecord flush() { + return DorisRecord.EMPTY; + } + + default void close() throws Exception {} } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java index 2615fb90a..1e41bb54d 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java @@ -17,41 +17,52 @@ package org.apache.doris.flink.sink.writer.serializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.deserialization.converter.DorisRowConverter; +import org.apache.doris.flink.sink.EscapeHandler; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.doris.flink.deserialization.converter.DorisRowConverter; -import org.apache.doris.flink.sink.EscapeHandler; - +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.StringJoiner; +import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW; import static org.apache.doris.flink.sink.writer.LoadConstants.CSV; import static org.apache.doris.flink.sink.writer.LoadConstants.DORIS_DELETE_SIGN; import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE; -/** Serializer for RowData. */ +/** + * Serializer for RowData. + */ public class RowDataSerializer implements DorisRecordSerializer { + private static final Logger LOG = LoggerFactory.getLogger(RowDataSerializer.class); String[] fieldNames; String type; private ObjectMapper objectMapper; private final String fieldDelimiter; private final boolean enableDelete; private final DorisRowConverter rowConverter; + private ArrowSerializer arrowSerializer; + ByteArrayOutputStream outputStream; + private final int arrowBatchCnt = 1000; + private int arrowWriteCnt = 0; + private final DataType[] dataTypes; - private RowDataSerializer( - String[] fieldNames, - DataType[] dataTypes, - String type, - String fieldDelimiter, - boolean enableDelete) { + private RowDataSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, boolean enableDelete) { this.fieldNames = fieldNames; this.type = type; this.fieldDelimiter = fieldDelimiter; @@ -59,23 +70,81 @@ private RowDataSerializer( if (JSON.equals(type)) { objectMapper = new ObjectMapper(); } + this.dataTypes = dataTypes; this.rowConverter = new DorisRowConverter().setExternalConverter(dataTypes); } @Override - public DorisRecord serialize(RowData record) throws IOException { + public void initial() { + if (ARROW.equals(type)) { + LogicalType[] logicalTypes = TypeConversions.fromDataToLogicalType(dataTypes); + RowType rowType = RowType.of(logicalTypes, fieldNames); + arrowSerializer = new ArrowSerializer(rowType, rowType); + outputStream = new ByteArrayOutputStream(); + try { + arrowSerializer.open(new ByteArrayInputStream(new byte[0]), outputStream); + } catch (Exception e) { + throw new RuntimeException("failed to open arrow serializer:", e); + } + } + } + + @Override + public DorisRecord serialize(RowData record) throws IOException{ int maxIndex = Math.min(record.getArity(), fieldNames.length); String valString; if (JSON.equals(type)) { valString = buildJsonString(record, maxIndex); } else if (CSV.equals(type)) { valString = buildCSVString(record, maxIndex); + } else if (ARROW.equals(type)) { + arrowWriteCnt += 1; + arrowSerializer.write(record); + if (arrowWriteCnt < arrowBatchCnt) { + return DorisRecord.EMPTY; + } + return arrowToDorisRecord(); } else { throw new IllegalArgumentException("The type " + type + " is not supported!"); } return DorisRecord.of(valString.getBytes(StandardCharsets.UTF_8)); } + @Override + public DorisRecord flush() { + if (JSON.equals(type) || CSV.equals(type)) { + return DorisRecord.EMPTY; + } else if (ARROW.equals(type)) { + return arrowToDorisRecord(); + } else { + throw new IllegalArgumentException("The type " + type + " is not supported!"); + } + } + + @Override + public void close() throws Exception { + if (ARROW.equals(type)) { + arrowSerializer.close(); + } + } + + public DorisRecord arrowToDorisRecord() { + if (arrowWriteCnt == 0) { + return DorisRecord.EMPTY; + } + arrowWriteCnt = 0; + try { + arrowSerializer.finishCurrentBatch(); + byte[] bytes = outputStream.toByteArray(); + outputStream.reset(); + arrowSerializer.resetWriter(); + return DorisRecord.of(bytes); + } catch (Exception e) { + LOG.error("Failed to convert arrow batch:", e); + } + return DorisRecord.EMPTY; + } + public String buildJsonString(RowData record, int maxIndex) throws IOException { int fieldIndex = 0; Map valueMap = new HashMap<>(); @@ -120,7 +189,9 @@ public static Builder builder() { return new Builder(); } - /** Builder for RowDataSerializer. */ + /** + * Builder for RowDataSerializer. + */ public static class Builder { private String[] fieldNames; private DataType[] dataTypes; @@ -154,10 +225,10 @@ public Builder enableDelete(boolean deletable) { } public RowDataSerializer build() { - Preconditions.checkState( - CSV.equals(type) && fieldDelimiter != null || JSON.equals(type)); + Preconditions.checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type) || ARROW.equals(type)); Preconditions.checkNotNull(dataTypes); Preconditions.checkNotNull(fieldNames); + Preconditions.checkArgument(ARROW.equals(type) && !deletable); return new RowDataSerializer(fieldNames, dataTypes, type, fieldDelimiter, deletable); } } diff --git a/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java new file mode 100644 index 000000000..d7c2f430f --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.arrow.serializers; + +import org.apache.flink.api.python.shaded.org.apache.arrow.memory.RootAllocator; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.arrow.ArrowReader; +import org.apache.flink.table.runtime.arrow.ArrowUtils; +import org.apache.flink.table.runtime.arrow.ArrowWriter; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.flink.api.python.shaded.org.apache.arrow.memory.BufferAllocator; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamWriter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * this code is copied from flink-python, and modified finishCurrentBatch to add end operation. + *

+ * The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow bytes. + */ +public final class ArrowSerializer { + + static { + ArrowUtils.checkArrowUsable(); + } + + /** The input RowType. */ + protected final RowType inputType; + + /** The output RowType. */ + protected final RowType outputType; + + /** Allocator which is used for byte buffer allocation. */ + private transient BufferAllocator allocator; + + /** Reader which is responsible for deserialize the Arrow format data to the Flink rows. */ + private transient ArrowReader arrowReader; + + /** + * Reader which is responsible for convert the execution result from byte array to arrow format. + */ + private transient ArrowStreamReader arrowStreamReader; + + /** + * Container that holds a set of vectors for the input elements to be sent to the Python worker. + */ + transient VectorSchemaRoot rootWriter; + + /** Writer which is responsible for serialize the input elements to arrow format. */ + private transient ArrowWriter arrowWriter; + + /** Writer which is responsible for convert the arrow format data into byte array. */ + private transient ArrowStreamWriter arrowStreamWriter; + + /** Reusable InputStream used to holding the execution results to be deserialized. */ + private transient InputStream bais; + + /** Reusable OutputStream used to holding the serialized input elements. */ + private transient OutputStream baos; + + public ArrowSerializer(RowType inputType, RowType outputType) { + this.inputType = inputType; + this.outputType = outputType; + } + + public void open(InputStream bais, OutputStream baos) throws Exception { + this.bais = bais; + this.baos = baos; + allocator = new RootAllocator(Long.MAX_VALUE); + arrowStreamReader = new ArrowStreamReader(bais, allocator); + + rootWriter = VectorSchemaRoot.create(ArrowUtils.toArrowSchema(inputType), allocator); + arrowWriter = createArrowWriter(); + arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos); + arrowStreamWriter.start(); + } + + public int load() throws IOException { + arrowStreamReader.loadNextBatch(); + VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); + if (arrowReader == null) { + arrowReader = createArrowReader(root); + } + return root.getRowCount(); + } + + public RowData read(int i) { + return arrowReader.read(i); + } + + public void write(RowData element) { + arrowWriter.write(element); + } + + public void close() throws Exception { + arrowStreamWriter.end(); + arrowStreamReader.close(); + rootWriter.close(); + allocator.close(); + } + + /** Creates an {@link ArrowWriter}. */ + public ArrowWriter createArrowWriter() { + return ArrowUtils.createRowDataArrowWriter(rootWriter, inputType); + } + + public ArrowReader createArrowReader(VectorSchemaRoot root) { + return ArrowUtils.createArrowReader(root, outputType); + } + + /** + * Forces to finish the processing of the current batch of elements. It will serialize the batch + * of elements into one arrow batch. + */ + public void finishCurrentBatch() throws Exception { + arrowWriter.finish(); + arrowStreamWriter.writeBatch(); + arrowStreamWriter.end(); + arrowWriter.reset(); + } + + public void resetReader() throws IOException { + arrowReader = null; + arrowStreamReader.close(); + arrowStreamReader = new ArrowStreamReader(bais, allocator); + } + + public void resetWriter() throws IOException { + arrowStreamWriter = new ArrowStreamWriter(rootWriter, null, baos); + arrowStreamWriter.start(); + } +} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java index d1028fe74..951250d4a 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java @@ -17,24 +17,33 @@ package org.apache.doris.flink.sink.writer; +import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; +import org.apache.flink.table.types.DataType; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; -import org.apache.flink.table.types.DataType; -import org.apache.flink.types.RowKind; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.RowKind; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; -/** test for RowDataSerializer. */ +/** + * test for RowDataSerializer. + */ public class TestRowDataSerializer { static GenericRowData rowData; static DataType[] dataTypes; @@ -47,18 +56,14 @@ public static void setUp() { rowData.setField(1, StringData.fromString("test")); rowData.setField(2, 60.2); rowData.setRowKind(RowKind.INSERT); - dataTypes = new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE()}; - fieldNames = new String[] {"id", "name", "weight"}; + dataTypes = new DataType[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE()}; + fieldNames = new String[]{"id", "name", "weight"}; } @Test public void testSerializeCsv() throws IOException { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames) - .setFieldType(dataTypes) - .setType("csv") - .setFieldDelimiter("|") - .enableDelete(false); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false); RowDataSerializer serializer = builder.build(); byte[] serializedValue = serializer.serialize(rowData).getRow(); Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), serializedValue); @@ -67,18 +72,11 @@ public void testSerializeCsv() throws IOException { @Test public void testSerializeJson() throws IOException { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames) - .setFieldType(dataTypes) - .setType("json") - .setFieldDelimiter("|") - .enableDelete(false); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false); RowDataSerializer serializer = builder.build(); byte[] serializedValue = serializer.serialize(rowData).getRow(); ObjectMapper objectMapper = new ObjectMapper(); - Map valueMap = - objectMapper.readValue( - new String(serializedValue, StandardCharsets.UTF_8), - new TypeReference>() {}); + Map valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference>(){}); Assert.assertEquals("3", valueMap.get("id")); Assert.assertEquals("test", valueMap.get("name")); Assert.assertEquals("60.2", valueMap.get("weight")); @@ -87,11 +85,7 @@ public void testSerializeJson() throws IOException { @Test public void testSerializeCsvWithSign() throws IOException { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames) - .setFieldType(dataTypes) - .setType("csv") - .setFieldDelimiter("|") - .enableDelete(true); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true); RowDataSerializer serializer = builder.build(); byte[] serializedValue = serializer.serialize(rowData).getRow(); Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), serializedValue); @@ -100,18 +94,11 @@ public void testSerializeCsvWithSign() throws IOException { @Test public void testSerializeJsonWithSign() throws IOException { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames) - .setFieldType(dataTypes) - .setType("json") - .setFieldDelimiter("|") - .enableDelete(true); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true); RowDataSerializer serializer = builder.build(); byte[] serializedValue = serializer.serialize(rowData).getRow(); ObjectMapper objectMapper = new ObjectMapper(); - Map valueMap = - objectMapper.readValue( - new String(serializedValue, StandardCharsets.UTF_8), - new TypeReference>() {}); + Map valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference>(){}); Assert.assertEquals("3", valueMap.get("id")); Assert.assertEquals("test", valueMap.get("name")); Assert.assertEquals("60.2", valueMap.get("weight")); @@ -121,15 +108,39 @@ public void testSerializeJsonWithSign() throws IOException { @Test public void testParseDeleteSign() { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames) - .setFieldType(dataTypes) - .setType("json") - .setFieldDelimiter("|") - .enableDelete(true); + builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true); RowDataSerializer serializer = builder.build(); Assert.assertEquals("0", serializer.parseDeleteSign(RowKind.INSERT)); Assert.assertEquals("0", serializer.parseDeleteSign(RowKind.UPDATE_AFTER)); Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.DELETE)); Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.UPDATE_BEFORE)); } + @Test + public void testArrowType() throws Exception { + RowDataSerializer serializer = RowDataSerializer.builder() + .setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType("arrow") + .enableDelete(false) + .build(); + + // write data to binary + serializer.initial(); + serializer.serialize(rowData); + byte[] serializedValue = serializer.flush().getRow(); + + // read data from binary + LogicalType[] logicalTypes = TypeConversions.fromDataToLogicalType(dataTypes); + RowType rowType = RowType.of(logicalTypes, fieldNames); + ArrowSerializer arrowSerializer = new ArrowSerializer(rowType, rowType); + ByteArrayInputStream input = new ByteArrayInputStream(serializedValue); + arrowSerializer.open(input, new ByteArrayOutputStream(0)); + int cnt = arrowSerializer.load(); + RowData data = arrowSerializer.read(0); + + Assert.assertEquals(1, cnt); + Assert.assertEquals(3, data.getInt(0)); + Assert.assertEquals("test", data.getString(1).toString()); + Assert.assertEquals(60.2, data.getDouble(2), 0.001); + } } From 7e6e856d44aa775738254782dbc3bdcf719013a5 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 18 Dec 2023 16:49:21 +0800 Subject: [PATCH 2/7] checkstyle --- .../flink/sink/batch/BatchRecordBuffer.java | 47 ++--- .../sink/batch/DorisBatchStreamLoad.java | 123 +++++++----- .../flink/sink/batch/DorisBatchWriter.java | 52 +++-- .../flink/sink/writer/DorisStreamLoad.java | 102 ++++++---- .../doris/flink/sink/writer/DorisWriter.java | 181 +++++++++++------- .../serializer/DorisRecordSerializer.java | 2 + .../writer/serializer/RowDataSerializer.java | 29 +-- .../arrow/serializers/ArrowSerializer.java | 14 +- .../sink/writer/TestRowDataSerializer.java | 76 +++++--- 9 files changed, 382 insertions(+), 244 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 339ec8494..45e683df5 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -18,15 +18,13 @@ package org.apache.doris.flink.sink.batch; import org.apache.flink.annotation.VisibleForTesting; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; - -/** - * buffer to queue - */ +/** buffer to queue */ public class BatchRecordBuffer { private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class); public static final String LINE_SEPARATOR = "\n"; @@ -39,7 +37,7 @@ public class BatchRecordBuffer { private String database; private String table; - public BatchRecordBuffer(){} + public BatchRecordBuffer() {} public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) { super(); @@ -57,7 +55,7 @@ public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, in public void insert(byte[] record) { ensureCapacity(record.length); - if(loadBatchFirstRecord) { + if (loadBatchFirstRecord) { loadBatchFirstRecord = false; } else if (lineDelimiter != null) { this.buffer.put(this.lineDelimiter); @@ -70,7 +68,7 @@ public void insert(byte[] record) { @VisibleForTesting public void ensureCapacity(int length) { int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length; - if(buffer.remaining() - lineDelimiterSize >= length){ + if (buffer.remaining() - lineDelimiterSize >= length) { return; } int currentRemain = buffer.remaining(); @@ -87,7 +85,12 @@ public void ensureCapacity(int length) { tmp.put(buffer); buffer.clear(); buffer = tmp; - LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", length, currentRemain, currentCapacity, newCapacity); + LOG.info( + "record length {},buffer remain {} ,grow capacity {} to {}", + length, + currentRemain, + currentCapacity, + newCapacity); } public String getLabelName() { @@ -98,21 +101,19 @@ public void setLabelName(String labelName) { this.labelName = labelName; } - /** - * @return true if buffer is empty - */ + /** @return true if buffer is empty */ public boolean isEmpty() { return numOfRecords == 0; } public ByteBuffer getData() { - //change mode + // change mode buffer.flip(); - LOG.debug("flush buffer: {} records, {} bytes",getNumOfRecords(),getBufferSizeBytes()); + LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(), getBufferSizeBytes()); return buffer; } - public void clear(){ + public void clear() { this.buffer.clear(); this.numOfRecords = 0; this.bufferSizeBytes = 0; @@ -120,33 +121,25 @@ public void clear(){ this.loadBatchFirstRecord = true; } - public ByteBuffer getBuffer(){ + public ByteBuffer getBuffer() { return buffer; } - /** - * @return Number of records in this buffer - */ + /** @return Number of records in this buffer */ public int getNumOfRecords() { return numOfRecords; } - /** - * @return Buffer size in bytes - */ + /** @return Buffer size in bytes */ public int getBufferSizeBytes() { return bufferSizeBytes; } - /** - * @param numOfRecords Updates number of records (Usually by 1) - */ + /** @param numOfRecords Updates number of records (Usually by 1) */ public void setNumOfRecords(int numOfRecords) { this.numOfRecords = numOfRecords; } - /** - * @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes) - */ + /** @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes) */ public void setBufferSizeBytes(int bufferSizeBytes) { this.bufferSizeBytes = bufferSizeBytes; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index e968406f5..03b9fd9ff 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -17,6 +17,8 @@ package org.apache.doris.flink.sink.batch; +import org.apache.flink.util.Preconditions; + import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -30,7 +32,6 @@ import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.LabelGenerator; -import org.apache.flink.util.Preconditions; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -66,14 +67,13 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; -/** - * async stream load - **/ +/** async stream load */ public class DorisBatchStreamLoad implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(DorisBatchStreamLoad.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); + private static final List DORIS_SUCCESS_STATUS = + new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final LabelGenerator labelGenerator; private final byte[] lineDelimiter; private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load"; @@ -93,13 +93,16 @@ public class DorisBatchStreamLoad implements Serializable { private CloseableHttpClient httpClient = new HttpUtil().getHttpClient(); private BackendUtil backendUtil; - public DorisBatchStreamLoad(DorisOptions dorisOptions, - DorisReadOptions dorisReadOptions, - DorisExecutionOptions executionOptions, - LabelGenerator labelGenerator) { - this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil( - dorisOptions.getBenodes()) - : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); + public DorisBatchStreamLoad( + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions, + LabelGenerator labelGenerator) { + this.backendUtil = + StringUtils.isNotEmpty(dorisOptions.getBenodes()) + ? new BackendUtil(dorisOptions.getBenodes()) + : new BackendUtil( + RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); this.hostPort = backendUtil.getAvailableBackend(); this.username = dorisOptions.getUsername(); this.password = dorisOptions.getPassword(); @@ -108,39 +111,65 @@ public DorisBatchStreamLoad(DorisOptions dorisOptions, if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) { this.lineDelimiter = null; } else { - this.lineDelimiter = EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes(); + this.lineDelimiter = + EscapeHandler.escapeString( + loadProps.getProperty( + LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) + .getBytes(); } this.executionOptions = executionOptions; this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize()); - if(StringUtils.isNotBlank(dorisOptions.getTableIdentifier())){ + if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) { String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); - Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table"); + Preconditions.checkState( + tableInfo.length == 2, + "tableIdentifier input error, the format is database.table"); this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]); } - this.loadAsyncExecutor= new LoadAsyncExecutor(); - this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy()); + this.loadAsyncExecutor = new LoadAsyncExecutor(); + this.loadExecutorService = + new ThreadPoolExecutor( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(1), + new DefaultThreadFactory("streamload-executor"), + new ThreadPoolExecutor.AbortPolicy()); this.started = new AtomicBoolean(true); this.loadExecutorService.execute(loadAsyncExecutor); } /** * write record into cache. + * * @param record * @throws IOException */ - public synchronized void writeRecord(String database, String table, byte[] record) throws InterruptedException { + public synchronized void writeRecord(String database, String table, byte[] record) + throws InterruptedException { checkFlushException(); String bufferKey = getTableIdentifier(database, table); - BatchRecordBuffer buffer = bufferMap.computeIfAbsent(bufferKey, k -> new BatchRecordBuffer(database, table, this.lineDelimiter, executionOptions.getBufferFlushMaxBytes())); + BatchRecordBuffer buffer = + bufferMap.computeIfAbsent( + bufferKey, + k -> + new BatchRecordBuffer( + database, + table, + this.lineDelimiter, + executionOptions.getBufferFlushMaxBytes())); buffer.insert(record); - //When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion + // When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8 - || (executionOptions.getBufferFlushMaxRows() != 0 && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { - flush(bufferKey,false); + || (executionOptions.getBufferFlushMaxRows() != 0 + && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) { + flush(bufferKey, false); } } - public synchronized void flush(String bufferKey, boolean waitUtilDone) throws InterruptedException { + public synchronized void flush(String bufferKey, boolean waitUtilDone) + throws InterruptedException { checkFlushException(); if (null == bufferKey) { for (String key : bufferMap.keySet()) { @@ -162,9 +191,9 @@ private synchronized void flushBuffer(String bufferKey) { bufferMap.remove(bufferKey); } - private void putRecordToFlushQueue(BatchRecordBuffer buffer){ + private void putRecordToFlushQueue(BatchRecordBuffer buffer) { checkFlushException(); - if(!loadThreadAlive){ + if (!loadThreadAlive) { throw new RuntimeException("load thread already exit, write was interrupted"); } try { @@ -181,7 +210,7 @@ private void checkFlushException() { } private void waitAsyncLoadFinish() { - for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){ + for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) { BatchRecordBuffer empty = new BatchRecordBuffer(); putRecordToFlushQueue(empty); } @@ -191,11 +220,11 @@ private String getTableIdentifier(String database, String table) { return database + "." + table; } - public void close(){ - //close async executor + public void close() { + // close async executor this.loadExecutorService.shutdown(); this.started.set(false); - //clear buffer + // clear buffer this.flushQueue.clear(); } @@ -208,7 +237,7 @@ public void run() { BatchRecordBuffer buffer = null; try { buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS); - if(buffer == null){ + if (buffer == null) { continue; } if (buffer.getLabelName() != null) { @@ -217,7 +246,7 @@ public void run() { } catch (Exception e) { LOG.error("worker running error", e); exception.set(e); - //clear queue to avoid writer thread blocking + // clear queue to avoid writer thread blocking flushQueue.clear(); break; } @@ -226,15 +255,15 @@ public void run() { loadThreadAlive = false; } - /** - * execute stream load - */ - public void load(String label, BatchRecordBuffer buffer) throws IOException{ + /** execute stream load */ + public void load(String label, BatchRecordBuffer buffer) throws IOException { refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); ByteBuffer data = buffer.getData(); - ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit()); + ByteArrayEntity entity = + new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit()); HttpPutBuilder putBuilder = new HttpPutBuilder(); - putBuilder.setUrl(loadUrl) + putBuilder + .setUrl(loadUrl) .baseAuth(username, password) .setLabel(label) .addCommonHeader() @@ -250,21 +279,27 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException{ if (statusCode == 200 && response.getEntity() != null) { String loadResult = EntityUtils.toString(response.getEntity()); LOG.info("load Result {}", loadResult); - RespContent respContent = OBJECT_MAPPER.readValue(loadResult, RespContent.class); + RespContent respContent = + OBJECT_MAPPER.readValue(loadResult, RespContent.class); if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL()); + String errMsg = + String.format( + "stream load error: %s, see more in %s", + respContent.getMessage(), respContent.getErrorURL()); throw new DorisBatchLoadException(errMsg); - }else{ + } else { return; } } - LOG.error("stream load failed with {}, reason {}, to retry", hostPort, response.getStatusLine().toString()); - }catch (Exception ex){ + LOG.error( + "stream load failed with {}, reason {}, to retry", + hostPort, + response.getStatusLine().toString()); + } catch (Exception ex) { if (retry == executionOptions.getMaxRetries()) { throw new DorisBatchLoadException("stream load error: ", ex); } LOG.error("stream load error with {}, to retry, cause by", hostPort, ex); - } retry++; // get available backend retry @@ -275,7 +310,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException{ buffer = null; } - private void refreshLoadUrl(String database, String table){ + private void refreshLoadUrl(String database, String table) { hostPort = backendUtil.getAvailableBackend(); loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java index 9dc2b936c..6a6576c44 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java @@ -17,17 +17,18 @@ package org.apache.doris.flink.sink.batch; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.writer.LabelGenerator; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.SinkWriter; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.StringUtils; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,21 +52,26 @@ public class DorisBatchWriter implements SinkWriter { private String database; private String table; - public DorisBatchWriter(Sink.InitContext initContext, - DorisRecordSerializer serializer, - DorisOptions dorisOptions, - DorisReadOptions dorisReadOptions, - DorisExecutionOptions executionOptions) { - if(!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())){ + public DorisBatchWriter( + Sink.InitContext initContext, + DorisRecordSerializer serializer, + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions) { + if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) { String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); - Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table"); + Preconditions.checkState( + tableInfo.length == 2, + "tableIdentifier input error, the format is database.table"); this.database = tableInfo[0]; this.table = tableInfo[1]; } LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId(); this.labelGenerator = new LabelGenerator(labelPrefix, false); - this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-flush-interval")); + this.scheduledExecutorService = + new ScheduledThreadPoolExecutor( + 1, new ExecutorThreadFactory("stream-load-flush-interval")); this.serializer = serializer; this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; @@ -75,9 +81,13 @@ public DorisBatchWriter(Sink.InitContext initContext, } public void initializeLoad() throws IOException { - this.batchStreamLoad = new DorisBatchStreamLoad(dorisOptions, dorisReadOptions, executionOptions, labelGenerator); - // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. - scheduledExecutorService.scheduleWithFixedDelay(this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); + this.batchStreamLoad = + new DorisBatchStreamLoad( + dorisOptions, dorisReadOptions, executionOptions, labelGenerator); + // when uploading data in streaming mode, we need to regularly detect whether there are + // exceptions. + scheduledExecutorService.scheduleWithFixedDelay( + this::intervalFlush, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); } private void intervalFlush() { @@ -100,18 +110,18 @@ public void flush(boolean flush) throws IOException, InterruptedException { checkFlushException(); writeOneDorisRecord(serializer.flush()); LOG.info("checkpoint flush triggered."); - batchStreamLoad.flush(null, true); + batchStreamLoad.flush(null, true); } public void writeOneDorisRecord(DorisRecord record) throws InterruptedException { - if(record == null || record.getRow() == null){ - //ddl or value is null + if (record == null || record.getRow() == null) { + // ddl or value is null return; } String db = this.database; String tbl = this.table; - //multi table load - if(record.getTableIdentifier() != null){ + // multi table load + if (record.getTableIdentifier() != null) { db = record.getDatabase(); tbl = record.getTable(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java index bb575d871..b095eb9e9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java @@ -17,6 +17,10 @@ package org.apache.doris.flink.sink.writer; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.doris.flink.cfg.DorisExecutionOptions; @@ -28,9 +32,6 @@ import org.apache.doris.flink.sink.EscapeHandler; import org.apache.doris.flink.sink.HttpPutBuilder; import org.apache.doris.flink.sink.ResponseUtil; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.entity.InputStreamEntity; import org.apache.http.impl.client.CloseableHttpClient; @@ -59,9 +60,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; -/** - * load data to doris. - **/ +/** load data to doris. */ public class DorisStreamLoad implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -87,11 +86,12 @@ public class DorisStreamLoad implements Serializable { private final ExecutorService executorService; private boolean loadBatchFirstRecord; - public DorisStreamLoad(String hostPort, - DorisOptions dorisOptions, - DorisExecutionOptions executionOptions, - LabelGenerator labelGenerator, - CloseableHttpClient httpClient) { + public DorisStreamLoad( + String hostPort, + DorisOptions dorisOptions, + DorisExecutionOptions executionOptions, + LabelGenerator labelGenerator, + CloseableHttpClient httpClient) { this.hostPort = hostPort; String[] tableInfo = dorisOptions.getTableIdentifier().split("\\."); this.db = tableInfo[0]; @@ -105,14 +105,27 @@ public DorisStreamLoad(String hostPort, this.streamLoadProp = executionOptions.getStreamLoadProp(); this.enableDelete = executionOptions.getDeletable(); this.httpClient = httpClient; - this.executorService = new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(), new ExecutorThreadFactory("stream-load-upload")); - this.recordStream = new RecordStream(executionOptions.getBufferSize(), executionOptions.getBufferCount(), executionOptions.isUseCache()); + this.executorService = + new ThreadPoolExecutor( + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + new ExecutorThreadFactory("stream-load-upload")); + this.recordStream = + new RecordStream( + executionOptions.getBufferSize(), + executionOptions.getBufferCount(), + executionOptions.isUseCache()); if (streamLoadProp.getProperty(FORMAT_KEY, CSV).equals(ARROW)) { lineDelimiter = null; } else { - lineDelimiter = EscapeHandler.escapeString(streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes(); + lineDelimiter = + EscapeHandler.escapeString( + streamLoadProp.getProperty( + LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)) + .getBytes(); } loadBatchFirstRecord = true; } @@ -141,6 +154,7 @@ public Future getPendingLoadFuture() { /** * try to discard pending transactions with labels beginning with labelSuffix. + * * @param labelSuffix * @param chkID * @throws Exception @@ -150,7 +164,8 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { LOG.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID); while (true) { try { - // TODO: According to label abort txn. Currently, it can only be aborted based on txnid, + // TODO: According to label abort txn. Currently, it can only be aborted based on + // txnid, // so we must first request a streamload based on the label to get the txnid. String label = labelGenerator.generateTableLabel(startChkID); HttpPutBuilder builder = new HttpPutBuilder(); @@ -161,17 +176,20 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { .setLabel(label) .setEmptyEntity() .addProperties(streamLoadProp); - RespContent respContent = handlePreCommitResponse(httpClient.execute(builder.build())); + RespContent respContent = + handlePreCommitResponse(httpClient.execute(builder.build())); Preconditions.checkState("true".equals(respContent.getTwoPhaseCommit())); if (LABEL_ALREADY_EXIST.equals(respContent.getStatus())) { // label already exist and job finished if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) { - throw new DorisException("Load status is " + LABEL_ALREADY_EXIST + " and load job finished, " + - "change you label prefix or restore from latest savepoint!"); - + throw new DorisException( + "Load status is " + + LABEL_ALREADY_EXIST + + " and load job finished, " + + "change you label prefix or restore from latest savepoint!"); } // job not finished, abort. - Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage()); + Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage()); if (matcher.find()) { Preconditions.checkState(label.equals(matcher.group(1))); long txnId = Long.parseLong(matcher.group(2)); @@ -179,7 +197,10 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { abortTransaction(txnId); } else { LOG.error("response: {}", respContent.toString()); - throw new DorisException("Load Status is " + LABEL_ALREADY_EXIST + ", but no txnID associated with it!"); + throw new DorisException( + "Load Status is " + + LABEL_ALREADY_EXIST + + ", but no txnID associated with it!"); } } else { LOG.info("abort {} for check label {}.", respContent.getTxnId(), label); @@ -197,10 +218,11 @@ public void abortPreCommit(String labelSuffix, long chkID) throws Exception { /** * write record into stream. + * * @param record * @throws IOException */ - public void writeRecord(byte[] record) throws IOException{ + public void writeRecord(byte[] record) throws IOException { if (loadBatchFirstRecord) { loadBatchFirstRecord = false; } else if (lineDelimiter != null) { @@ -214,7 +236,7 @@ public RecordStream getRecordStream() { return recordStream; } - public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception{ + public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception { final int statusCode = response.getStatusLine().getStatusCode(); if (statusCode == 200 && response.getEntity() != null) { String loadResult = EntityUtils.toString(response.getEntity()); @@ -224,12 +246,12 @@ public RespContent handlePreCommitResponse(CloseableHttpResponse response) throw throw new StreamLoadException("stream load error: " + response.getStatusLine().toString()); } - public RespContent stopLoad(String label) throws IOException{ + public RespContent stopLoad(String label) throws IOException { recordStream.endInput(); LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort); Preconditions.checkState(pendingLoadFuture != null); try { - return handlePreCommitResponse(pendingLoadFuture.get()); + return handlePreCommitResponse(pendingLoadFuture.get()); } catch (Exception e) { throw new DorisRuntimeException(e); } @@ -237,6 +259,7 @@ public RespContent stopLoad(String label) throws IOException{ /** * start write data for new checkpoint. + * * @param label * @throws IOException */ @@ -247,7 +270,8 @@ public void startLoad(String label, boolean isResume) throws IOException { LOG.info("table {} stream load started for {} on host {}", table, label, hostPort); try { InputStreamEntity entity = new InputStreamEntity(recordStream); - putBuilder.setUrl(loadUrlStr) + putBuilder + .setUrl(loadUrlStr) .baseAuth(user, passwd) .addCommonHeader() .addHiddenColumns(enableDelete) @@ -255,12 +279,14 @@ public void startLoad(String label, boolean isResume) throws IOException { .setEntity(entity) .addProperties(streamLoadProp); if (enable2PC) { - putBuilder.enable2PC(); + putBuilder.enable2PC(); } - pendingLoadFuture = executorService.submit(() -> { - LOG.info("table {} start execute load", table); - return httpClient.execute(putBuilder.build()); - }); + pendingLoadFuture = + executorService.submit( + () -> { + LOG.info("table {} start execute load", table); + return httpClient.execute(putBuilder.build()); + }); } catch (Exception e) { String err = "failed to stream load data with label: " + label; LOG.warn(err, e); @@ -281,16 +307,18 @@ public void abortTransaction(long txnID) throws Exception { int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != 200 || response.getEntity() == null) { LOG.warn("abort transaction response: " + response.getStatusLine().toString()); - throw new DorisRuntimeException("Fail to abort transaction " + txnID + " with url " + abortUrlStr); + throw new DorisRuntimeException( + "Fail to abort transaction " + txnID + " with url " + abortUrlStr); } ObjectMapper mapper = new ObjectMapper(); String loadResult = EntityUtils.toString(response.getEntity()); - Map res = mapper.readValue(loadResult, new TypeReference>(){}); + Map res = + mapper.readValue(loadResult, new TypeReference>() {}); if (!SUCCESS.equals(res.get("status"))) { if (ResponseUtil.isCommitted(res.get("msg"))) { - throw new DorisException("try abort committed transaction, " + - "do you recover from old savepoint?"); + throw new DorisException( + "try abort committed transaction, " + "do you recover from old savepoint?"); } LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnID, res.get("msg")); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index b588542dd..f7727d607 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -17,6 +17,13 @@ package org.apache.doris.flink.sink.writer; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; @@ -29,12 +36,6 @@ import org.apache.doris.flink.sink.HttpUtil; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.api.connector.sink2.StatefulSink; -import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; -import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,12 +56,15 @@ /** * Doris Writer will load data to doris. + * * @param */ -public class DorisWriter implements StatefulSink.StatefulSinkWriter, - TwoPhaseCommittingSink.PrecommittingSinkWriter { +public class DorisWriter + implements StatefulSink.StatefulSinkWriter, + TwoPhaseCommittingSink.PrecommittingSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.class); - private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); + private static final List DORIS_SUCCESS_STATUS = + new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT)); private final long lastCheckpointId; private long curCheckpointId; private Map dorisStreamLoadMap = new ConcurrentHashMap<>(); @@ -79,12 +83,13 @@ public class DorisWriter implements StatefulSink.StatefulSinkWriter state, - DorisRecordSerializer serializer, - DorisOptions dorisOptions, - DorisReadOptions dorisReadOptions, - DorisExecutionOptions executionOptions) { + public DorisWriter( + Sink.InitContext initContext, + Collection state, + DorisRecordSerializer serializer, + DorisOptions dorisOptions, + DorisReadOptions dorisReadOptions, + DorisExecutionOptions executionOptions) { this.lastCheckpointId = initContext .getRestoredCheckpointId() @@ -94,7 +99,8 @@ public DorisWriter(Sink.InitContext initContext, LOG.info("labelPrefix " + executionOptions.getLabelPrefix()); this.labelPrefix = executionOptions.getLabelPrefix(); this.subtaskId = initContext.getSubtaskId(); - this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check")); + this.scheduledExecutorService = + new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check")); this.serializer = serializer; this.dorisOptions = dorisOptions; this.dorisReadOptions = dorisReadOptions; @@ -109,7 +115,7 @@ public DorisWriter(Sink.InitContext initContext, public void initializeLoad(Collection state) { this.backendUtil = BackendUtil.getInstance(dorisOptions, dorisReadOptions, LOG); try { - if(executionOptions.enabled2PC()) { + if (executionOptions.enabled2PC()) { abortLingeringTransactions(state); } } catch (Exception e) { @@ -118,35 +124,42 @@ public void initializeLoad(Collection state) { } // get main work thread. executorThread = Thread.currentThread(); - // when uploading data in streaming mode, we need to regularly detect whether there are exceptions. - scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); + // when uploading data in streaming mode, we need to regularly detect whether there are + // exceptions. + scheduledExecutorService.scheduleWithFixedDelay( + this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS); } - private void abortLingeringTransactions(Collection recoveredStates) throws Exception { + private void abortLingeringTransactions(Collection recoveredStates) + throws Exception { List alreadyAborts = new ArrayList<>(); - //abort label in state - for(DorisWriterState state : recoveredStates){ - // Todo: When the sink parallelism is reduced, - // the txn of the redundant task before aborting is also needed. - if(!state.getLabelPrefix().equals(labelPrefix)){ - LOG.warn("Label prefix from previous execution {} has changed to {}.", state.getLabelPrefix(), executionOptions.getLabelPrefix()); - } - if (state.getDatabase() == null || state.getTable() == null) { - LOG.warn("Transactions cannot be aborted when restore because the last used flink-doris-connector version less than 1.5.0."); - continue; - } - String key = state.getDatabase() + "." + state.getTable(); - DorisStreamLoad streamLoader = getStreamLoader(key); - streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId); - alreadyAborts.add(state.getLabelPrefix()); + // abort label in state + for (DorisWriterState state : recoveredStates) { + // Todo: When the sink parallelism is reduced, + // the txn of the redundant task before aborting is also needed. + if (!state.getLabelPrefix().equals(labelPrefix)) { + LOG.warn( + "Label prefix from previous execution {} has changed to {}.", + state.getLabelPrefix(), + executionOptions.getLabelPrefix()); + } + if (state.getDatabase() == null || state.getTable() == null) { + LOG.warn( + "Transactions cannot be aborted when restore because the last used flink-doris-connector version less than 1.5.0."); + continue; + } + String key = state.getDatabase() + "." + state.getTable(); + DorisStreamLoad streamLoader = getStreamLoader(key); + streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId); + alreadyAborts.add(state.getLabelPrefix()); } // TODO: In a multi-table scenario, if do not restore from checkpoint, // when modify labelPrefix at startup, we cannot abort the previous label. - if(!alreadyAborts.contains(labelPrefix) + if (!alreadyAborts.contains(labelPrefix) && StringUtils.isNotEmpty(dorisOptions.getTableIdentifier()) - && StringUtils.isNotEmpty(labelPrefix)){ - //abort current labelPrefix + && StringUtils.isNotEmpty(labelPrefix)) { + // abort current labelPrefix DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier()); streamLoader.abortPreCommit(labelPrefix, curCheckpointId); } @@ -165,19 +178,19 @@ public void flush(boolean endOfInput) throws IOException, InterruptedException { public void writeOneDorisRecord(DorisRecord record) throws IOException { - if(record == null || record.getRow() == null){ - //ddl or value is null + if (record == null || record.getRow() == null) { + // ddl or value is null return; } - //multi table load + // multi table load String tableKey = dorisOptions.getTableIdentifier(); - if(record.getTableIdentifier() != null){ + if (record.getTableIdentifier() != null) { tableKey = record.getTableIdentifier(); } DorisStreamLoad streamLoader = getStreamLoader(tableKey); - if(!loadingMap.containsKey(tableKey)) { + if (!loadingMap.containsKey(tableKey)) { // start stream load only when there has data LabelGenerator labelGenerator = getLabelGenerator(tableKey); String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); @@ -191,7 +204,7 @@ public void writeOneDorisRecord(DorisRecord record) throws IOException { @Override public Collection prepareCommit() throws IOException, InterruptedException { // Verify whether data is written during a checkpoint - if(!globalLoading && loadingMap.values().stream().noneMatch(Boolean::booleanValue)){ + if (!globalLoading && loadingMap.values().stream().noneMatch(Boolean::booleanValue)) { return Collections.emptyList(); } // disable exception checker before stop load. @@ -199,9 +212,9 @@ public Collection prepareCommit() throws IOException, Interrup // submit stream load http request List committableList = new ArrayList<>(); - for(Map.Entry streamLoader : dorisStreamLoadMap.entrySet()){ + for (Map.Entry streamLoader : dorisStreamLoadMap.entrySet()) { String tableIdentifier = streamLoader.getKey(); - if(!loadingMap.getOrDefault(tableIdentifier, false)){ + if (!loadingMap.getOrDefault(tableIdentifier, false)) { LOG.debug("skip table {}, no data need to load.", tableIdentifier); continue; } @@ -210,12 +223,19 @@ public Collection prepareCommit() throws IOException, Interrup String currentLabel = labelGenerator.generateTableLabel(curCheckpointId); RespContent respContent = dorisStreamLoad.stopLoad(currentLabel); if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) { - String errMsg = String.format("tabel {} stream load error: %s, see more in %s", tableIdentifier, respContent.getMessage(), respContent.getErrorURL()); + String errMsg = + String.format( + "tabel {} stream load error: %s, see more in %s", + tableIdentifier, + respContent.getMessage(), + respContent.getErrorURL()); throw new DorisRuntimeException(errMsg); } - if(executionOptions.enabled2PC()){ + if (executionOptions.enabled2PC()) { long txnId = respContent.getTxnId(); - committableList.add(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); + committableList.add( + new DorisCommittable( + dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId)); } } // clean loadingMap @@ -226,40 +246,51 @@ public Collection prepareCommit() throws IOException, Interrup @Override public List snapshotState(long checkpointId) throws IOException { List writerStates = new ArrayList<>(); - for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){ - //Dynamic refresh backend + for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) { + // Dynamic refresh backend dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend()); - DorisWriterState writerState = new DorisWriterState(labelPrefix, dorisStreamLoad.getDb(), dorisStreamLoad.getTable(), subtaskId); + DorisWriterState writerState = + new DorisWriterState( + labelPrefix, + dorisStreamLoad.getDb(), + dorisStreamLoad.getTable(), + subtaskId); writerStates.add(writerState); } this.curCheckpointId = checkpointId + 1; return writerStates; } - private LabelGenerator getLabelGenerator(String tableKey){ - return labelGeneratorMap.computeIfAbsent(tableKey, v-> new LabelGenerator(labelPrefix, executionOptions.enabled2PC(), tableKey, subtaskId)); + private LabelGenerator getLabelGenerator(String tableKey) { + return labelGeneratorMap.computeIfAbsent( + tableKey, + v -> + new LabelGenerator( + labelPrefix, executionOptions.enabled2PC(), tableKey, subtaskId)); } - private DorisStreamLoad getStreamLoader(String tableKey){ + private DorisStreamLoad getStreamLoader(String tableKey) { LabelGenerator labelGenerator = getLabelGenerator(tableKey); dorisOptions.setTableIdentifier(tableKey); - return dorisStreamLoadMap.computeIfAbsent(tableKey, v -> new DorisStreamLoad(backendUtil.getAvailableBackend(), - dorisOptions, - executionOptions, - labelGenerator, - new HttpUtil().getHttpClient())); + return dorisStreamLoadMap.computeIfAbsent( + tableKey, + v -> + new DorisStreamLoad( + backendUtil.getAvailableBackend(), + dorisOptions, + executionOptions, + labelGenerator, + new HttpUtil().getHttpClient())); } - /** - * Check the streamload http request regularly - */ + /** Check the streamload http request regularly */ private void checkDone() { - for(Map.Entry streamLoadMap : dorisStreamLoadMap.entrySet()){ + for (Map.Entry streamLoadMap : dorisStreamLoadMap.entrySet()) { checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue()); } } - private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad){ + private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad) { // the load future is done and checked in prepareCommit(). // this will check error while loading. LOG.debug("start timer checker, interval {} ms", intervalTime); @@ -283,23 +314,32 @@ private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoa dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId); } // start a new txn(stream load) - LOG.info("getting exception, breakpoint resume for checkpoint ID: {}, table {}", curCheckpointId, tableIdentifier); + LOG.info( + "getting exception, breakpoint resume for checkpoint ID: {}, table {}", + curCheckpointId, + tableIdentifier); LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier); - dorisStreamLoad.startLoad(labelGenerator.generateTableLabel(curCheckpointId), true); + dorisStreamLoad.startLoad( + labelGenerator.generateTableLabel(curCheckpointId), true); } catch (Exception e) { throw new DorisRuntimeException(e); } } else { String errorMsg; try { - RespContent content = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get()); + RespContent content = + dorisStreamLoad.handlePreCommitResponse( + dorisStreamLoad.getPendingLoadFuture().get()); errorMsg = content.getMessage(); } catch (Exception e) { errorMsg = e.getMessage(); } loadException = new StreamLoadException(errorMsg); - LOG.error("table {} stream load finished unexpectedly, interrupt worker thread! {}", tableIdentifier, errorMsg); + LOG.error( + "table {} stream load finished unexpectedly, interrupt worker thread! {}", + tableIdentifier, + errorMsg); // set the executor thread interrupted in case blocking in write data. executorThread.interrupt(); } @@ -330,11 +370,10 @@ public void close() throws Exception { scheduledExecutorService.shutdownNow(); } if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) { - for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){ + for (DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()) { dorisStreamLoad.close(); } } serializer.close(); } - } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java index e450cebb8..11c7e1c44 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java @@ -22,12 +22,14 @@ /** * How to serialize the record to bytes. + * * @param */ public interface DorisRecordSerializer extends Serializable { /** * define how to convert record into byte array. + * * @param record * @return [tableIdentifer,byte array] * @throws IOException diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java index 1e41bb54d..f15326bb1 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java @@ -17,9 +17,6 @@ package org.apache.doris.flink.sink.writer.serializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.doris.flink.deserialization.converter.DorisRowConverter; -import org.apache.doris.flink.sink.EscapeHandler; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; import org.apache.flink.table.types.DataType; @@ -28,6 +25,10 @@ import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.deserialization.converter.DorisRowConverter; +import org.apache.doris.flink.sink.EscapeHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,9 +46,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.JSON; import static org.apache.doris.flink.sink.writer.LoadConstants.NULL_VALUE; -/** - * Serializer for RowData. - */ +/** Serializer for RowData. */ public class RowDataSerializer implements DorisRecordSerializer { private static final Logger LOG = LoggerFactory.getLogger(RowDataSerializer.class); String[] fieldNames; @@ -62,7 +61,12 @@ public class RowDataSerializer implements DorisRecordSerializer { private int arrowWriteCnt = 0; private final DataType[] dataTypes; - private RowDataSerializer(String[] fieldNames, DataType[] dataTypes, String type, String fieldDelimiter, boolean enableDelete) { + private RowDataSerializer( + String[] fieldNames, + DataType[] dataTypes, + String type, + String fieldDelimiter, + boolean enableDelete) { this.fieldNames = fieldNames; this.type = type; this.fieldDelimiter = fieldDelimiter; @@ -90,7 +94,7 @@ public void initial() { } @Override - public DorisRecord serialize(RowData record) throws IOException{ + public DorisRecord serialize(RowData record) throws IOException { int maxIndex = Math.min(record.getArity(), fieldNames.length); String valString; if (JSON.equals(type)) { @@ -189,9 +193,7 @@ public static Builder builder() { return new Builder(); } - /** - * Builder for RowDataSerializer. - */ + /** Builder for RowDataSerializer. */ public static class Builder { private String[] fieldNames; private DataType[] dataTypes; @@ -225,7 +227,10 @@ public Builder enableDelete(boolean deletable) { } public RowDataSerializer build() { - Preconditions.checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type) || ARROW.equals(type)); + Preconditions.checkState( + CSV.equals(type) && fieldDelimiter != null + || JSON.equals(type) + || ARROW.equals(type)); Preconditions.checkNotNull(dataTypes); Preconditions.checkNotNull(fieldNames); Preconditions.checkArgument(ARROW.equals(type) && !deletable); diff --git a/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java index d7c2f430f..29f809f1c 100644 --- a/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java @@ -18,26 +18,26 @@ package org.apache.flink.table.runtime.arrow.serializers; +import org.apache.flink.api.python.shaded.org.apache.arrow.memory.BufferAllocator; import org.apache.flink.api.python.shaded.org.apache.arrow.memory.RootAllocator; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.arrow.ArrowReader; import org.apache.flink.table.runtime.arrow.ArrowUtils; import org.apache.flink.table.runtime.arrow.ArrowWriter; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.api.python.shaded.org.apache.arrow.memory.BufferAllocator; -import org.apache.flink.api.python.shaded.org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamReader; -import org.apache.flink.api.python.shaded.org.apache.arrow.vector.ipc.ArrowStreamWriter; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; /** * this code is copied from flink-python, and modified finishCurrentBatch to add end operation. - *

- * The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow bytes. + * + *

The base class ArrowSerializer which will serialize/deserialize RowType data to/from arrow + * bytes. */ public final class ArrowSerializer { diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java index 951250d4a..84e297153 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestRowDataSerializer.java @@ -17,20 +17,20 @@ package org.apache.doris.flink.sink.writer; -import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; -import org.apache.flink.table.types.DataType; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; - +import org.apache.flink.table.runtime.arrow.serializers.ArrowSerializer; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.RowKind; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -41,9 +41,7 @@ import java.nio.charset.StandardCharsets; import java.util.Map; -/** - * test for RowDataSerializer. - */ +/** test for RowDataSerializer. */ public class TestRowDataSerializer { static GenericRowData rowData; static DataType[] dataTypes; @@ -56,14 +54,18 @@ public static void setUp() { rowData.setField(1, StringData.fromString("test")); rowData.setField(2, 60.2); rowData.setRowKind(RowKind.INSERT); - dataTypes = new DataType[]{DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE()}; - fieldNames = new String[]{"id", "name", "weight"}; + dataTypes = new DataType[] {DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE()}; + fieldNames = new String[] {"id", "name", "weight"}; } @Test public void testSerializeCsv() throws IOException { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(false); + builder.setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType("csv") + .setFieldDelimiter("|") + .enableDelete(false); RowDataSerializer serializer = builder.build(); byte[] serializedValue = serializer.serialize(rowData).getRow(); Assert.assertArrayEquals("3|test|60.2".getBytes(StandardCharsets.UTF_8), serializedValue); @@ -72,11 +74,18 @@ public void testSerializeCsv() throws IOException { @Test public void testSerializeJson() throws IOException { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(false); + builder.setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType("json") + .setFieldDelimiter("|") + .enableDelete(false); RowDataSerializer serializer = builder.build(); byte[] serializedValue = serializer.serialize(rowData).getRow(); ObjectMapper objectMapper = new ObjectMapper(); - Map valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference>(){}); + Map valueMap = + objectMapper.readValue( + new String(serializedValue, StandardCharsets.UTF_8), + new TypeReference>() {}); Assert.assertEquals("3", valueMap.get("id")); Assert.assertEquals("test", valueMap.get("name")); Assert.assertEquals("60.2", valueMap.get("weight")); @@ -85,7 +94,11 @@ public void testSerializeJson() throws IOException { @Test public void testSerializeCsvWithSign() throws IOException { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("csv").setFieldDelimiter("|").enableDelete(true); + builder.setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType("csv") + .setFieldDelimiter("|") + .enableDelete(true); RowDataSerializer serializer = builder.build(); byte[] serializedValue = serializer.serialize(rowData).getRow(); Assert.assertArrayEquals("3|test|60.2|0".getBytes(StandardCharsets.UTF_8), serializedValue); @@ -94,11 +107,18 @@ public void testSerializeCsvWithSign() throws IOException { @Test public void testSerializeJsonWithSign() throws IOException { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true); + builder.setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType("json") + .setFieldDelimiter("|") + .enableDelete(true); RowDataSerializer serializer = builder.build(); byte[] serializedValue = serializer.serialize(rowData).getRow(); ObjectMapper objectMapper = new ObjectMapper(); - Map valueMap = objectMapper.readValue(new String(serializedValue, StandardCharsets.UTF_8), new TypeReference>(){}); + Map valueMap = + objectMapper.readValue( + new String(serializedValue, StandardCharsets.UTF_8), + new TypeReference>() {}); Assert.assertEquals("3", valueMap.get("id")); Assert.assertEquals("test", valueMap.get("name")); Assert.assertEquals("60.2", valueMap.get("weight")); @@ -108,21 +128,27 @@ public void testSerializeJsonWithSign() throws IOException { @Test public void testParseDeleteSign() { RowDataSerializer.Builder builder = RowDataSerializer.builder(); - builder.setFieldNames(fieldNames).setFieldType(dataTypes).setType("json").setFieldDelimiter("|").enableDelete(true); + builder.setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType("json") + .setFieldDelimiter("|") + .enableDelete(true); RowDataSerializer serializer = builder.build(); Assert.assertEquals("0", serializer.parseDeleteSign(RowKind.INSERT)); Assert.assertEquals("0", serializer.parseDeleteSign(RowKind.UPDATE_AFTER)); Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.DELETE)); Assert.assertEquals("1", serializer.parseDeleteSign(RowKind.UPDATE_BEFORE)); } + @Test public void testArrowType() throws Exception { - RowDataSerializer serializer = RowDataSerializer.builder() - .setFieldNames(fieldNames) - .setFieldType(dataTypes) - .setType("arrow") - .enableDelete(false) - .build(); + RowDataSerializer serializer = + RowDataSerializer.builder() + .setFieldNames(fieldNames) + .setFieldType(dataTypes) + .setType("arrow") + .enableDelete(false) + .build(); // write data to binary serializer.initial(); From 38df7ddb6d5794ac4fb768b6b9d63a22e656cd44 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 20 Dec 2023 16:25:49 +0800 Subject: [PATCH 3/7] checkstyle --- .../apache/doris/flink/sink/batch/BatchRecordBuffer.java | 3 ++- .../doris/flink/sink/batch/DorisBatchStreamLoad.java | 4 ++-- .../org/apache/doris/flink/sink/writer/DorisWriter.java | 4 ++-- .../doris/flink/sink/writer/serializer/DorisRecord.java | 2 +- .../sink/writer/serializer/DorisRecordSerializer.java | 2 +- .../flink/sink/writer/serializer/RowDataSerializer.java | 8 ++++---- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java index 45e683df5..df40e7a9b 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; -/** buffer to queue */ +/** buffer to queue. */ public class BatchRecordBuffer { private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class); public static final String LINE_SEPARATOR = "\n"; @@ -124,6 +124,7 @@ public void clear() { public ByteBuffer getBuffer() { return buffer; } + /** @return Number of records in this buffer */ public int getNumOfRecords() { return numOfRecords; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index 03b9fd9ff..f32ce2c9e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -67,7 +67,7 @@ import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT; import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY; -/** async stream load */ +/** async stream load. */ public class DorisBatchStreamLoad implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(DorisBatchStreamLoad.class); @@ -255,7 +255,7 @@ public void run() { loadThreadAlive = false; } - /** execute stream load */ + /** execute stream load. */ public void load(String label, BatchRecordBuffer buffer) throws IOException { refreshLoadUrl(buffer.getDatabase(), buffer.getTable()); ByteBuffer data = buffer.getData(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index f7727d607..e8cc1ff3c 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -68,7 +68,7 @@ public class DorisWriter private final long lastCheckpointId; private long curCheckpointId; private Map dorisStreamLoadMap = new ConcurrentHashMap<>(); - private Map labelGeneratorMap = new ConcurrentHashMap<>();; + private Map labelGeneratorMap = new ConcurrentHashMap<>(); volatile boolean globalLoading; private Map loadingMap = new ConcurrentHashMap<>(); private final DorisOptions dorisOptions; @@ -283,7 +283,7 @@ private DorisStreamLoad getStreamLoader(String tableKey) { new HttpUtil().getHttpClient())); } - /** Check the streamload http request regularly */ + /** Check the streamload http request regularly. */ private void checkDone() { for (Map.Entry streamLoadMap : dorisStreamLoadMap.entrySet()) { checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue()); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java index 9de4c958f..6a5bdde5f 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecord.java @@ -21,7 +21,7 @@ public class DorisRecord implements Serializable { - public static DorisRecord EMPTY = new DorisRecord(); + public static DorisRecord empty = new DorisRecord(); private String database; private String table; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java index 11c7e1c44..5582ea5f8 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/DorisRecordSerializer.java @@ -39,7 +39,7 @@ public interface DorisRecordSerializer extends Serializable { default void initial() {} default DorisRecord flush() { - return DorisRecord.EMPTY; + return DorisRecord.empty; } default void close() throws Exception {} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java index f15326bb1..ec513f137 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java @@ -105,7 +105,7 @@ public DorisRecord serialize(RowData record) throws IOException { arrowWriteCnt += 1; arrowSerializer.write(record); if (arrowWriteCnt < arrowBatchCnt) { - return DorisRecord.EMPTY; + return DorisRecord.empty; } return arrowToDorisRecord(); } else { @@ -117,7 +117,7 @@ public DorisRecord serialize(RowData record) throws IOException { @Override public DorisRecord flush() { if (JSON.equals(type) || CSV.equals(type)) { - return DorisRecord.EMPTY; + return DorisRecord.empty; } else if (ARROW.equals(type)) { return arrowToDorisRecord(); } else { @@ -134,7 +134,7 @@ public void close() throws Exception { public DorisRecord arrowToDorisRecord() { if (arrowWriteCnt == 0) { - return DorisRecord.EMPTY; + return DorisRecord.empty; } arrowWriteCnt = 0; try { @@ -146,7 +146,7 @@ public DorisRecord arrowToDorisRecord() { } catch (Exception e) { LOG.error("Failed to convert arrow batch:", e); } - return DorisRecord.EMPTY; + return DorisRecord.empty; } public String buildJsonString(RowData record, int maxIndex) throws IOException { From cae6ae5ba4b71c67bbdfd33469808806d6aef974 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 26 Dec 2023 17:04:42 +0800 Subject: [PATCH 4/7] 1. set flink-python_2.12 for flink1.15.0 2. change the precondition for arrow --- flink-doris-connector/build.sh | 4 +++- flink-doris-connector/pom.xml | 3 ++- .../doris/flink/sink/writer/serializer/RowDataSerializer.java | 4 +++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/build.sh b/flink-doris-connector/build.sh index a646f5859..1c67f2b24 100755 --- a/flink-doris-connector/build.sh +++ b/flink-doris-connector/build.sh @@ -142,8 +142,10 @@ selectFlink() { FLINK_VERSION=0 selectFlink flinkVer=$? +FLINK_PYTHON_ID="flink-python" if [ ${flinkVer} -eq 1 ]; then FLINK_VERSION="1.15.0" + FLINK_PYTHON_ID="flink-python_2.12" elif [ ${flinkVer} -eq 2 ]; then FLINK_VERSION="1.16.0" elif [ ${flinkVer} -eq 3 ]; then @@ -160,7 +162,7 @@ FLINK_MAJOR_VERSION=0 echo_g " flink version: ${FLINK_VERSION}, major version: ${FLINK_MAJOR_VERSION}" echo_g " build starting..." -${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} "$@" +${MVN_BIN} clean package -Dflink.version=${FLINK_VERSION} -Dflink.major.version=${FLINK_MAJOR_VERSION} -Dflink.python.id=${FLINK_PYTHON_ID} "$@" EXIT_CODE=$? if [ $EXIT_CODE -eq 0 ]; then diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index a2550e13a..cc618bdf1 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -71,6 +71,7 @@ under the License. 1.18.0 1.18 2.4.2 + flink-python 0.16.0 13.0.0 3.10.1 @@ -139,7 +140,7 @@ under the License. org.apache.flink - flink-python + ${flink.python.id} ${flink.version} provided diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java index ec513f137..f7d987484 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/RowDataSerializer.java @@ -233,7 +233,9 @@ public RowDataSerializer build() { || ARROW.equals(type)); Preconditions.checkNotNull(dataTypes); Preconditions.checkNotNull(fieldNames); - Preconditions.checkArgument(ARROW.equals(type) && !deletable); + if (ARROW.equals(type)) { + Preconditions.checkArgument(!deletable); + } return new RowDataSerializer(fieldNames, dataTypes, type, fieldDelimiter, deletable); } } From c51df1407e2668f5c54f3b63a616a9a16e3d7dec Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 27 Dec 2023 10:10:34 +0800 Subject: [PATCH 5/7] =?UTF-8?q?arrow=E8=87=AA=E5=B8=A6=E7=9A=84netty-commo?= =?UTF-8?q?n=E6=98=AF4.1.96=EF=BC=8C=E6=89=80=E4=BB=A5=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E5=8E=9F=E6=9D=A5=E7=9A=844.1.77=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- flink-doris-connector/pom.xml | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index cc618bdf1..7d28f0630 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -85,7 +85,6 @@ under the License. 2.4.2 4.5.13 1.15 - 4.1.77.Final 2.13.3 31.1-jre 1.7.25 @@ -203,19 +202,9 @@ under the License. com.fasterxml.jackson.core jackson-databind - - io.netty - netty-common - - - io.netty - netty-common - ${netty.version} - - com.fasterxml.jackson.core From 9d8fa677c7c37f4ca10ff1b80e8d6740ae81b720 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 27 Dec 2023 10:44:57 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=B9=B6=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0flink=E7=89=88=E6=9C=AC=E7=BC=96=E8=AF=91=E5=91=BD?= =?UTF-8?q?=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/build-extension.yml | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml index 2038d21b1..ce159708c 100644 --- a/.github/workflows/build-extension.yml +++ b/.github/workflows/build-extension.yml @@ -42,5 +42,26 @@ jobs: run: | cd flink-doris-connector && mvn clean package \ -Dflink.version=1.15.0 \ - -Dflink.minor.version=1.15 + -Dflink.minor.version=1.15 \ + -Dflink.python.id=flink-python_2.12 + - name: Build flink connector 1.16 + run: | + mvn clean package \ + -Dflink.version=1.16.0 \ + -Dflink.minor.version=1.16 \ + -Dflink.python.id=flink-python + + - name: Build flink connector 1.17 + run: | + mvn clean package \ + -Dflink.version=1.17.0 \ + -Dflink.minor.version=1.17 \ + -Dflink.python.id=flink-python + + - name: Build flink connector 1.18 + run: | + mvn clean package \ + -Dflink.version=1.18.0 \ + -Dflink.minor.version=1.18 \ + -Dflink.python.id=flink-python From 2f601b35c214bf1d322a979337ad8e8d06110a0f Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 27 Dec 2023 10:49:37 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=B9=B6=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0flink=E7=89=88=E6=9C=AC=E7=BC=96=E8=AF=91=E5=91=BD?= =?UTF-8?q?=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/build-extension.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml index ce159708c..7259bb43a 100644 --- a/.github/workflows/build-extension.yml +++ b/.github/workflows/build-extension.yml @@ -47,21 +47,21 @@ jobs: - name: Build flink connector 1.16 run: | - mvn clean package \ + cd flink-doris-connector && mvn clean package \ -Dflink.version=1.16.0 \ -Dflink.minor.version=1.16 \ -Dflink.python.id=flink-python - name: Build flink connector 1.17 run: | - mvn clean package \ + cd flink-doris-connector && mvn clean package \ -Dflink.version=1.17.0 \ -Dflink.minor.version=1.17 \ -Dflink.python.id=flink-python - name: Build flink connector 1.18 run: | - mvn clean package \ + cd flink-doris-connector && mvn clean package \ -Dflink.version=1.18.0 \ -Dflink.minor.version=1.18 \ -Dflink.python.id=flink-python