Skip to content

Commit

Permalink
checkstyle
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi committed Dec 18, 2023
1 parent a676302 commit 7e6e856
Show file tree
Hide file tree
Showing 9 changed files with 382 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package org.apache.doris.flink.sink.batch;

import org.apache.flink.annotation.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;


/**
* buffer to queue
*/
/** buffer to queue */
public class BatchRecordBuffer {
private static final Logger LOG = LoggerFactory.getLogger(BatchRecordBuffer.class);
public static final String LINE_SEPARATOR = "\n";
Expand All @@ -39,7 +37,7 @@ public class BatchRecordBuffer {
private String database;
private String table;

public BatchRecordBuffer(){}
public BatchRecordBuffer() {}

public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) {
super();
Expand All @@ -57,7 +55,7 @@ public BatchRecordBuffer(String database, String table, byte[] lineDelimiter, in

public void insert(byte[] record) {
ensureCapacity(record.length);
if(loadBatchFirstRecord) {
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else if (lineDelimiter != null) {
this.buffer.put(this.lineDelimiter);
Expand All @@ -70,7 +68,7 @@ public void insert(byte[] record) {
@VisibleForTesting
public void ensureCapacity(int length) {
int lineDelimiterSize = this.lineDelimiter == null ? 0 : this.lineDelimiter.length;
if(buffer.remaining() - lineDelimiterSize >= length){
if (buffer.remaining() - lineDelimiterSize >= length) {
return;
}
int currentRemain = buffer.remaining();
Expand All @@ -87,7 +85,12 @@ public void ensureCapacity(int length) {
tmp.put(buffer);
buffer.clear();
buffer = tmp;
LOG.info("record length {},buffer remain {} ,grow capacity {} to {}", length, currentRemain, currentCapacity, newCapacity);
LOG.info(
"record length {},buffer remain {} ,grow capacity {} to {}",
length,
currentRemain,
currentCapacity,
newCapacity);
}

public String getLabelName() {
Expand All @@ -98,55 +101,45 @@ public void setLabelName(String labelName) {
this.labelName = labelName;
}

/**
* @return true if buffer is empty
*/
/** @return true if buffer is empty */
public boolean isEmpty() {
return numOfRecords == 0;
}

public ByteBuffer getData() {
//change mode
// change mode
buffer.flip();
LOG.debug("flush buffer: {} records, {} bytes",getNumOfRecords(),getBufferSizeBytes());
LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(), getBufferSizeBytes());
return buffer;
}

public void clear(){
public void clear() {
this.buffer.clear();
this.numOfRecords = 0;
this.bufferSizeBytes = 0;
this.labelName = null;
this.loadBatchFirstRecord = true;
}

public ByteBuffer getBuffer(){
public ByteBuffer getBuffer() {
return buffer;
}
/**
* @return Number of records in this buffer
*/
/** @return Number of records in this buffer */
public int getNumOfRecords() {
return numOfRecords;
}

/**
* @return Buffer size in bytes
*/
/** @return Buffer size in bytes */
public int getBufferSizeBytes() {
return bufferSizeBytes;
}

/**
* @param numOfRecords Updates number of records (Usually by 1)
*/
/** @param numOfRecords Updates number of records (Usually by 1) */
public void setNumOfRecords(int numOfRecords) {
this.numOfRecords = numOfRecords;
}

/**
* @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes)
*/
/** @param bufferSizeBytes Updates sum of size of records present in this buffer (Bytes) */
public void setBufferSizeBytes(int bufferSizeBytes) {
this.bufferSizeBytes = bufferSizeBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.flink.sink.batch;

import org.apache.flink.util.Preconditions;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
Expand All @@ -30,7 +32,6 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.flink.util.Preconditions;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -66,14 +67,13 @@
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
import static org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;

/**
* async stream load
**/
/** async stream load */
public class DorisBatchStreamLoad implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DorisBatchStreamLoad.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private static final List<String> DORIS_SUCCESS_STATUS =
new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
Expand All @@ -93,13 +93,16 @@ public class DorisBatchStreamLoad implements Serializable {
private CloseableHttpClient httpClient = new HttpUtil().getHttpClient();
private BackendUtil backendUtil;

public DorisBatchStreamLoad(DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator) {
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
public DorisBatchStreamLoad(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator) {
this.backendUtil =
StringUtils.isNotEmpty(dorisOptions.getBenodes())
? new BackendUtil(dorisOptions.getBenodes())
: new BackendUtil(
RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.hostPort = backendUtil.getAvailableBackend();
this.username = dorisOptions.getUsername();
this.password = dorisOptions.getPassword();
Expand All @@ -108,39 +111,65 @@ public DorisBatchStreamLoad(DorisOptions dorisOptions,
if (loadProps.getProperty(FORMAT_KEY, CSV).equals(ARROW)) {
this.lineDelimiter = null;
} else {
this.lineDelimiter = EscapeHandler.escapeString(loadProps.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT)).getBytes();
this.lineDelimiter =
EscapeHandler.escapeString(
loadProps.getProperty(
LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT))
.getBytes();
}
this.executionOptions = executionOptions;
this.flushQueue = new LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if(StringUtils.isNotBlank(dorisOptions.getTableIdentifier())){
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
Preconditions.checkState(tableInfo.length == 2, "tableIdentifier input error, the format is database.table");
Preconditions.checkState(
tableInfo.length == 2,
"tableIdentifier input error, the format is database.table");
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort, tableInfo[0], tableInfo[1]);
}
this.loadAsyncExecutor= new LoadAsyncExecutor();
this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy());
this.loadAsyncExecutor = new LoadAsyncExecutor();
this.loadExecutorService =
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1),
new DefaultThreadFactory("streamload-executor"),
new ThreadPoolExecutor.AbortPolicy());
this.started = new AtomicBoolean(true);
this.loadExecutorService.execute(loadAsyncExecutor);
}

/**
* write record into cache.
*
* @param record
* @throws IOException
*/
public synchronized void writeRecord(String database, String table, byte[] record) throws InterruptedException {
public synchronized void writeRecord(String database, String table, byte[] record)
throws InterruptedException {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);
BatchRecordBuffer buffer = bufferMap.computeIfAbsent(bufferKey, k -> new BatchRecordBuffer(database, table, this.lineDelimiter, executionOptions.getBufferFlushMaxBytes()));
BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
bufferKey,
k ->
new BatchRecordBuffer(
database,
table,
this.lineDelimiter,
executionOptions.getBufferFlushMaxBytes()));
buffer.insert(record);
//When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion
// When it exceeds 80% of the byteSize,to flush, to avoid triggering bytebuffer expansion
if (buffer.getBufferSizeBytes() >= executionOptions.getBufferFlushMaxBytes() * 0.8
|| (executionOptions.getBufferFlushMaxRows() != 0 && buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
flush(bufferKey,false);
|| (executionOptions.getBufferFlushMaxRows() != 0
&& buffer.getNumOfRecords() >= executionOptions.getBufferFlushMaxRows())) {
flush(bufferKey, false);
}
}

public synchronized void flush(String bufferKey, boolean waitUtilDone) throws InterruptedException {
public synchronized void flush(String bufferKey, boolean waitUtilDone)
throws InterruptedException {
checkFlushException();
if (null == bufferKey) {
for (String key : bufferMap.keySet()) {
Expand All @@ -162,9 +191,9 @@ private synchronized void flushBuffer(String bufferKey) {
bufferMap.remove(bufferKey);
}

private void putRecordToFlushQueue(BatchRecordBuffer buffer){
private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
checkFlushException();
if(!loadThreadAlive){
if (!loadThreadAlive) {
throw new RuntimeException("load thread already exit, write was interrupted");
}
try {
Expand All @@ -181,7 +210,7 @@ private void checkFlushException() {
}

private void waitAsyncLoadFinish() {
for(int i = 0; i < executionOptions.getFlushQueueSize() + 1 ; i++){
for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) {
BatchRecordBuffer empty = new BatchRecordBuffer();
putRecordToFlushQueue(empty);
}
Expand All @@ -191,11 +220,11 @@ private String getTableIdentifier(String database, String table) {
return database + "." + table;
}

public void close(){
//close async executor
public void close() {
// close async executor
this.loadExecutorService.shutdown();
this.started.set(false);
//clear buffer
// clear buffer
this.flushQueue.clear();
}

Expand All @@ -208,7 +237,7 @@ public void run() {
BatchRecordBuffer buffer = null;
try {
buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
if(buffer == null){
if (buffer == null) {
continue;
}
if (buffer.getLabelName() != null) {
Expand All @@ -217,7 +246,7 @@ public void run() {
} catch (Exception e) {
LOG.error("worker running error", e);
exception.set(e);
//clear queue to avoid writer thread blocking
// clear queue to avoid writer thread blocking
flushQueue.clear();
break;
}
Expand All @@ -226,15 +255,15 @@ public void run() {
loadThreadAlive = false;
}

/**
* execute stream load
*/
public void load(String label, BatchRecordBuffer buffer) throws IOException{
/** execute stream load */
public void load(String label, BatchRecordBuffer buffer) throws IOException {
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
ByteBuffer data = buffer.getData();
ByteArrayEntity entity = new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
ByteArrayEntity entity =
new ByteArrayEntity(data.array(), data.arrayOffset(), data.limit());
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder.setUrl(loadUrl)
putBuilder
.setUrl(loadUrl)
.baseAuth(username, password)
.setLabel(label)
.addCommonHeader()
Expand All @@ -250,21 +279,27 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException{
if (statusCode == 200 && response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
LOG.info("load Result {}", loadResult);
RespContent respContent = OBJECT_MAPPER.readValue(loadResult, RespContent.class);
RespContent respContent =
OBJECT_MAPPER.readValue(loadResult, RespContent.class);
if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
String errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(), respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
}else{
} else {
return;
}
}
LOG.error("stream load failed with {}, reason {}, to retry", hostPort, response.getStatusLine().toString());
}catch (Exception ex){
LOG.error(
"stream load failed with {}, reason {}, to retry",
hostPort,
response.getStatusLine().toString());
} catch (Exception ex) {
if (retry == executionOptions.getMaxRetries()) {
throw new DorisBatchLoadException("stream load error: ", ex);
}
LOG.error("stream load error with {}, to retry, cause by", hostPort, ex);

}
retry++;
// get available backend retry
Expand All @@ -275,7 +310,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException{
buffer = null;
}

private void refreshLoadUrl(String database, String table){
private void refreshLoadUrl(String database, String table) {
hostPort = backendUtil.getAvailableBackend();
loadUrl = String.format(LOAD_URL_PATTERN, hostPort, database, table);
}
Expand Down
Loading

0 comments on commit 7e6e856

Please sign in to comment.