Skip to content

Commit

Permalink
add mode for kettle
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Nov 13, 2024
1 parent d6476dd commit 3f994c1
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
.setFormatMeta(data.formatMeta)
.setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT))
.setLogChannelInterface(log)
.setDeletable(options.isDeletable())
.build();
}

Expand Down Expand Up @@ -120,6 +121,8 @@ private void closeOutput() throws Exception {
public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (DorisStreamLoaderMeta) smi;
data = (DorisStreamLoaderData) sdi;
logDebug("Initializing step with meta : " + meta.toString());

if (super.init(smi, sdi)){
Properties streamHeaders = new Properties();
String streamLoadProp = meta.getStreamLoadProp();
Expand All @@ -141,7 +144,10 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
.withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes())
.withBufferFlushMaxRows(meta.getBufferFlushMaxRows())
.withMaxRetries(meta.getMaxRetries())
.withStreamLoadProp(streamHeaders).build();
.withStreamLoadProp(streamHeaders)
.withDeletable(meta.isDeletable()).build();

logDetailed("Initializing step with options: " + options.toString());
streamLoad = new DorisBatchStreamLoad(options, log);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
description = "BaseStep.TypeTooltipDesc.DorisStreamLoader",
categoryDescription = "i18n:org.pentaho.di.trans.step:BaseStep.Category.Bulk",
image = "doris.svg",
documentationUrl = "https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual/",
documentationUrl = "https://doris.apache.org/docs/dev/ecosystem/kettle/",
i18nPackageName = "org.pentaho.di.trans.steps.dorisstreamloader" )
@InjectionSupported( localizationPrefix = "DorisStreamLoader.Injection.", groups = { "FIELDS" } )
public class DorisStreamLoaderMeta extends BaseStepMeta implements StepMetaInterface {
Expand Down Expand Up @@ -84,6 +84,8 @@ public class DorisStreamLoaderMeta extends BaseStepMeta implements StepMetaInter

private int maxRetries;

private boolean deletable;

/** Field name of the target table */
@Injection( name = "FIELD_TABLE", group = "FIELDS" )
private String[] fieldTable;
Expand Down Expand Up @@ -111,8 +113,8 @@ private void readData( Node stepnode, List<? extends SharedObjectInterface> data
bufferFlushMaxRows = Long.valueOf(XMLHandler.getTagValue(stepnode, "bufferFlushMaxRows"));
bufferFlushMaxBytes = Long.valueOf(XMLHandler.getTagValue(stepnode, "bufferFlushMaxBytes"));
maxRetries = Integer.valueOf(XMLHandler.getTagValue(stepnode, "maxRetries"));

streamLoadProp = XMLHandler.getTagValue(stepnode, "streamLoadProp");
deletable = "Y".equalsIgnoreCase(XMLHandler.getTagValue(stepnode, "deletable"));

// Field data mapping
int nrvalues = XMLHandler.countNodes(stepnode, "mapping");
Expand Down Expand Up @@ -145,7 +147,7 @@ public void setDefault() {
bufferFlushMaxBytes = 10 * 1024 * 1024;
maxRetries = 3;
streamLoadProp = "format:json;read_json_by_line:true";

deletable = false;
allocate(0);
}

Expand All @@ -161,6 +163,7 @@ public String getXML() {
retval.append(" ").append(XMLHandler.addTagValue("bufferFlushMaxBytes", bufferFlushMaxBytes));
retval.append(" ").append(XMLHandler.addTagValue("maxRetries", maxRetries));
retval.append(" ").append(XMLHandler.addTagValue("streamLoadProp", streamLoadProp));
retval.append(" ").append(XMLHandler.addTagValue("deletable", deletable));

for (int i = 0; i < fieldTable.length; i++) {
retval.append(" <mapping>").append(Const.CR);
Expand Down Expand Up @@ -189,6 +192,7 @@ public void readRep( Repository rep, IMetaStore metaStore, ObjectId id_step, Lis
maxRetries = Integer.valueOf(rep.getStepAttributeString(id_step, "maxRetries"));

streamLoadProp = rep.getStepAttributeString(id_step, "streamLoadProp");
deletable = rep.getStepAttributeBoolean(id_step, "deletable");
int nrvalues = rep.countNrStepAttributes(id_step, "stream_name");
allocate(nrvalues);

Expand Down Expand Up @@ -217,6 +221,7 @@ public void saveRep( Repository rep, IMetaStore metaStore, ObjectId id_transform
rep.saveStepAttribute(id_transformation, id_step, "bufferFlushMaxRows", bufferFlushMaxRows);
rep.saveStepAttribute(id_transformation, id_step, "bufferFlushMaxBytes", bufferFlushMaxBytes);
rep.saveStepAttribute(id_transformation, id_step, "maxRetries", maxRetries);
rep.saveStepAttribute(id_transformation, id_step, "deletable", deletable);

for (int i = 0; i < fieldTable.length; i++) {
rep.saveStepAttribute(id_transformation, id_step, i, "stream_name", fieldTable[i]);
Expand Down Expand Up @@ -328,7 +333,15 @@ public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}

public String[] getFieldTable() {
public boolean isDeletable() {
return deletable;
}

public void setDeletable(boolean deletable) {
this.deletable = deletable;
}

public String[] getFieldTable() {
return fieldTable;
}

Expand Down Expand Up @@ -361,6 +374,7 @@ public String toString() {
", bufferFlushMaxRows=" + bufferFlushMaxRows +
", bufferFlushMaxBytes=" + bufferFlushMaxBytes +
", maxRetries=" + maxRetries +
", deletable=" + deletable +
", fieldTable=" + Arrays.toString(fieldTable) +
", fieldStream=" + Arrays.toString(fieldStream) +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.pentaho.di.trans.steps.dorisstreamloader.load;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang.StringUtils;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -203,6 +204,7 @@ public synchronized void writeRecord(String database, String table, byte[] recor
lock.lock();
try {
while (currentCacheBytes.get() >= maxBlockedBytes) {
checkFlushException();
log.logDetailed(
"Cache full, waiting for flush, currentBytes: " + currentCacheBytes.get()
+ ", maxBlockedBytes: " + maxBlockedBytes);
Expand Down Expand Up @@ -448,6 +450,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
.setLabel(label)
.addCommonHeader()
.setEntity(entity)
.addHiddenColumns(options.isDeletable())
.addProperties(options.getStreamLoadProp());

if (enableGzCompress) {
Expand Down Expand Up @@ -488,11 +491,22 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException {
putBuilder.setLabel(label + "_" + retry);
reason = respContent.getMessage();
} else {
String errMsg =
String errMsg = null;
if (StringUtils.isBlank(respContent.getMessage())
&& StringUtils.isBlank(respContent.getErrorURL())) {
// sometimes stream load will not return message
errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
"stream load error, response is %s",
loadResult);
throw new DorisRuntimeException(errMsg);
} else {
errMsg =
String.format(
"stream load error: %s, see more in %s",
respContent.getMessage(),
respContent.getErrorURL());
}
throw new DorisRuntimeException(errMsg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public class DorisOptions {
private long bufferFlushMaxBytes;
private Properties streamLoadProp;
private int maxRetries;
private boolean deletable;

public DorisOptions(String fenodes, String username, String password, String database, String table, long bufferFlushMaxRows, long bufferFlushMaxBytes, Properties streamLoadProp, int maxRetries) {
public DorisOptions(String fenodes, String username, String password, String database, String table, long bufferFlushMaxRows, long bufferFlushMaxBytes, Properties streamLoadProp, int maxRetries, boolean deletable) {
this.fenodes = fenodes;
this.username = username;
this.password = password;
Expand All @@ -46,6 +47,7 @@ public DorisOptions(String fenodes, String username, String password, String dat
this.bufferFlushMaxBytes = bufferFlushMaxBytes;
this.streamLoadProp = streamLoadProp;
this.maxRetries = maxRetries;
this.deletable = deletable;
}

public String getFenodes() {
Expand Down Expand Up @@ -84,6 +86,26 @@ public int getMaxRetries() {
return maxRetries;
}

public boolean isDeletable() {
return deletable;
}

@Override
public String toString() {
return "DorisOptions{" +
"fenodes='" + fenodes + '\'' +
", username='" + username + '\'' +
", password='" + password + '\'' +
", database='" + database + '\'' +
", table='" + table + '\'' +
", bufferFlushMaxRows=" + bufferFlushMaxRows +
", bufferFlushMaxBytes=" + bufferFlushMaxBytes +
", streamLoadProp=" + streamLoadProp +
", maxRetries=" + maxRetries +
", deletable=" + deletable +
'}';
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -98,6 +120,7 @@ public static class Builder {
private long bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES;
private int maxRetries = DEFAULT_MAX_RETRIES;
private Properties streamLoadProp = new Properties();
private boolean deletable = false;

public Builder withFenodes(String fenodes) {
this.fenodes = fenodes;
Expand Down Expand Up @@ -144,6 +167,11 @@ public Builder withMaxRetries(int maxRetries) {
return this;
}

public Builder withDeletable(boolean deletable) {
this.deletable = deletable;
return this;
}

public DorisOptions build() {
Preconditions.checkArgument(fenodes != null, "Fenodes must not be null");
Preconditions.checkArgument(username != null, "Username must not be null");
Expand All @@ -153,7 +181,7 @@ public DorisOptions build() {
Preconditions.checkArgument(bufferFlushMaxRows >= 10000, "BufferFlushMaxRows must be greater than 10000");
Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 * 1024, "BufferFlushMaxBytes must be greater than 10MB");
Preconditions.checkArgument(maxRetries >= 0, "MaxRetries must be greater than 0");
return new DorisOptions(fenodes, username, password, database, table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries);
return new DorisOptions(fenodes, username, password, database, table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries, deletable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.StringJoiner;

import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.CSV;
import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.DORIS_DELETE_SIGN;
import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.JSON;
import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.NULL_VALUE;

Expand All @@ -47,13 +48,15 @@ public class DorisRecordSerializer {
private final String fieldDelimiter;
private final ValueMetaInterface[] formatMeta;
private LogChannelInterface log;
private final boolean deletable;

private DorisRecordSerializer(
String[] fieldNames,
ValueMetaInterface[] formatMeta,
String type,
String fieldDelimiter,
LogChannelInterface log) {
LogChannelInterface log,
boolean deletable) {
this.fieldNames = fieldNames;
this.type = type;
this.fieldDelimiter = fieldDelimiter;
Expand All @@ -62,6 +65,7 @@ private DorisRecordSerializer(
}
this.formatMeta = formatMeta;
this.log = log;
this.deletable = deletable;
}


Expand Down Expand Up @@ -89,6 +93,10 @@ public String buildJsonString(Object[] record, int maxIndex) throws IOException,
valueMap.put(fieldNames[fieldIndex], value);
fieldIndex++;
}
if (deletable) {
// All load data will be deleted
valueMap.put(DORIS_DELETE_SIGN, "1");
}
return objectMapper.writeValueAsString(valueMap);
}

Expand All @@ -101,6 +109,10 @@ public String buildCSVString(Object[] record, int maxIndex) throws IOException,
joiner.add(value);
fieldIndex++;
}
if (deletable) {
// All load data will be deleted
joiner.add("1");
}
return joiner.toString();
}

Expand Down Expand Up @@ -147,6 +159,7 @@ public static class Builder {
private String type;
private String fieldDelimiter;
private LogChannelInterface log;
private boolean deletable;

public Builder setFieldNames(String[] fieldNames) {
this.fieldNames = fieldNames;
Expand All @@ -173,14 +186,19 @@ public Builder setLogChannelInterface(LogChannelInterface log) {
return this;
}

public Builder setDeletable(boolean deletable) {
this.deletable = deletable;
return this;
}

public DorisRecordSerializer build() {
Preconditions.checkState(
CSV.equals(type) && fieldDelimiter != null
|| JSON.equals(type));
Preconditions.checkNotNull(formatMeta);
Preconditions.checkNotNull(fieldNames);

return new DorisRecordSerializer(fieldNames, formatMeta, type, fieldDelimiter, log);
return new DorisRecordSerializer(fieldNames, formatMeta, type, fieldDelimiter, log, deletable);
}
}
}
Loading

0 comments on commit 3f994c1

Please sign in to comment.