diff --git a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java index c43e3e9e68aaf3..85dc097be4e69a 100644 --- a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java +++ b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java @@ -91,6 +91,7 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws .setFormatMeta(data.formatMeta) .setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT)) .setLogChannelInterface(log) + .setDeletable(options.isDeletable()) .build(); } @@ -120,6 +121,8 @@ private void closeOutput() throws Exception { public boolean init( StepMetaInterface smi, StepDataInterface sdi ) { meta = (DorisStreamLoaderMeta) smi; data = (DorisStreamLoaderData) sdi; + logDebug("Initializing step with meta : " + meta.toString()); + if (super.init(smi, sdi)){ Properties streamHeaders = new Properties(); String streamLoadProp = meta.getStreamLoadProp(); @@ -141,7 +144,10 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) { .withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes()) .withBufferFlushMaxRows(meta.getBufferFlushMaxRows()) .withMaxRetries(meta.getMaxRetries()) - .withStreamLoadProp(streamHeaders).build(); + .withStreamLoadProp(streamHeaders) + .withDeletable(meta.isDeletable()).build(); + + logDetailed("Initializing step with options: " + options.toString()); streamLoad = new DorisBatchStreamLoad(options, log); return true; } diff --git a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoaderMeta.java b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoaderMeta.java index c30ff83f025ef6..12d72d838afd1d 100644 --- a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoaderMeta.java +++ b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoaderMeta.java @@ -53,7 +53,7 @@ description = "BaseStep.TypeTooltipDesc.DorisStreamLoader", categoryDescription = "i18n:org.pentaho.di.trans.step:BaseStep.Category.Bulk", image = "doris.svg", - documentationUrl = "https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual/", + documentationUrl = "https://doris.apache.org/docs/dev/ecosystem/kettle/", i18nPackageName = "org.pentaho.di.trans.steps.dorisstreamloader" ) @InjectionSupported( localizationPrefix = "DorisStreamLoader.Injection.", groups = { "FIELDS" } ) public class DorisStreamLoaderMeta extends BaseStepMeta implements StepMetaInterface { @@ -84,6 +84,8 @@ public class DorisStreamLoaderMeta extends BaseStepMeta implements StepMetaInter private int maxRetries; + private boolean deletable; + /** Field name of the target table */ @Injection( name = "FIELD_TABLE", group = "FIELDS" ) private String[] fieldTable; @@ -111,8 +113,8 @@ private void readData( Node stepnode, List data bufferFlushMaxRows = Long.valueOf(XMLHandler.getTagValue(stepnode, "bufferFlushMaxRows")); bufferFlushMaxBytes = Long.valueOf(XMLHandler.getTagValue(stepnode, "bufferFlushMaxBytes")); maxRetries = Integer.valueOf(XMLHandler.getTagValue(stepnode, "maxRetries")); - streamLoadProp = XMLHandler.getTagValue(stepnode, "streamLoadProp"); + deletable = "Y".equalsIgnoreCase(XMLHandler.getTagValue(stepnode, "deletable")); // Field data mapping int nrvalues = XMLHandler.countNodes(stepnode, "mapping"); @@ -145,7 +147,7 @@ public void setDefault() { bufferFlushMaxBytes = 10 * 1024 * 1024; maxRetries = 3; streamLoadProp = "format:json;read_json_by_line:true"; - + deletable = false; allocate(0); } @@ -161,6 +163,7 @@ public String getXML() { retval.append(" ").append(XMLHandler.addTagValue("bufferFlushMaxBytes", bufferFlushMaxBytes)); retval.append(" ").append(XMLHandler.addTagValue("maxRetries", maxRetries)); retval.append(" ").append(XMLHandler.addTagValue("streamLoadProp", streamLoadProp)); + retval.append(" ").append(XMLHandler.addTagValue("deletable", deletable)); for (int i = 0; i < fieldTable.length; i++) { retval.append(" ").append(Const.CR); @@ -189,6 +192,7 @@ public void readRep( Repository rep, IMetaStore metaStore, ObjectId id_step, Lis maxRetries = Integer.valueOf(rep.getStepAttributeString(id_step, "maxRetries")); streamLoadProp = rep.getStepAttributeString(id_step, "streamLoadProp"); + deletable = rep.getStepAttributeBoolean(id_step, "deletable"); int nrvalues = rep.countNrStepAttributes(id_step, "stream_name"); allocate(nrvalues); @@ -217,6 +221,7 @@ public void saveRep( Repository rep, IMetaStore metaStore, ObjectId id_transform rep.saveStepAttribute(id_transformation, id_step, "bufferFlushMaxRows", bufferFlushMaxRows); rep.saveStepAttribute(id_transformation, id_step, "bufferFlushMaxBytes", bufferFlushMaxBytes); rep.saveStepAttribute(id_transformation, id_step, "maxRetries", maxRetries); + rep.saveStepAttribute(id_transformation, id_step, "deletable", deletable); for (int i = 0; i < fieldTable.length; i++) { rep.saveStepAttribute(id_transformation, id_step, i, "stream_name", fieldTable[i]); @@ -328,7 +333,15 @@ public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } - public String[] getFieldTable() { + public boolean isDeletable() { + return deletable; + } + + public void setDeletable(boolean deletable) { + this.deletable = deletable; + } + + public String[] getFieldTable() { return fieldTable; } @@ -361,6 +374,7 @@ public String toString() { ", bufferFlushMaxRows=" + bufferFlushMaxRows + ", bufferFlushMaxBytes=" + bufferFlushMaxBytes + ", maxRetries=" + maxRetries + + ", deletable=" + deletable + ", fieldTable=" + Arrays.toString(fieldTable) + ", fieldStream=" + Arrays.toString(fieldStream) + '}'; diff --git a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java index 8fccd0e22dd4fb..274c494c4815e8 100644 --- a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java +++ b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java @@ -18,6 +18,7 @@ package org.pentaho.di.trans.steps.dorisstreamloader.load; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.StringUtils; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; @@ -203,6 +204,7 @@ public synchronized void writeRecord(String database, String table, byte[] recor lock.lock(); try { while (currentCacheBytes.get() >= maxBlockedBytes) { + checkFlushException(); log.logDetailed( "Cache full, waiting for flush, currentBytes: " + currentCacheBytes.get() + ", maxBlockedBytes: " + maxBlockedBytes); @@ -448,6 +450,7 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { .setLabel(label) .addCommonHeader() .setEntity(entity) + .addHiddenColumns(options.isDeletable()) .addProperties(options.getStreamLoadProp()); if (enableGzCompress) { @@ -488,11 +491,22 @@ public void load(String label, BatchRecordBuffer buffer) throws IOException { putBuilder.setLabel(label + "_" + retry); reason = respContent.getMessage(); } else { - String errMsg = + String errMsg = null; + if (StringUtils.isBlank(respContent.getMessage()) + && StringUtils.isBlank(respContent.getErrorURL())) { + // sometimes stream load will not return message + errMsg = String.format( - "stream load error: %s, see more in %s", - respContent.getMessage(), - respContent.getErrorURL()); + "stream load error, response is %s", + loadResult); + throw new DorisRuntimeException(errMsg); + } else { + errMsg = + String.format( + "stream load error: %s, see more in %s", + respContent.getMessage(), + respContent.getErrorURL()); + } throw new DorisRuntimeException(errMsg); } } diff --git a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java index 4e11e6a1ef4ae9..202b996cafe914 100644 --- a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java +++ b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java @@ -35,8 +35,9 @@ public class DorisOptions { private long bufferFlushMaxBytes; private Properties streamLoadProp; private int maxRetries; + private boolean deletable; - public DorisOptions(String fenodes, String username, String password, String database, String table, long bufferFlushMaxRows, long bufferFlushMaxBytes, Properties streamLoadProp, int maxRetries) { + public DorisOptions(String fenodes, String username, String password, String database, String table, long bufferFlushMaxRows, long bufferFlushMaxBytes, Properties streamLoadProp, int maxRetries, boolean deletable) { this.fenodes = fenodes; this.username = username; this.password = password; @@ -46,6 +47,7 @@ public DorisOptions(String fenodes, String username, String password, String dat this.bufferFlushMaxBytes = bufferFlushMaxBytes; this.streamLoadProp = streamLoadProp; this.maxRetries = maxRetries; + this.deletable = deletable; } public String getFenodes() { @@ -84,6 +86,26 @@ public int getMaxRetries() { return maxRetries; } + public boolean isDeletable() { + return deletable; + } + + @Override + public String toString() { + return "DorisOptions{" + + "fenodes='" + fenodes + '\'' + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + ", database='" + database + '\'' + + ", table='" + table + '\'' + + ", bufferFlushMaxRows=" + bufferFlushMaxRows + + ", bufferFlushMaxBytes=" + bufferFlushMaxBytes + + ", streamLoadProp=" + streamLoadProp + + ", maxRetries=" + maxRetries + + ", deletable=" + deletable + + '}'; + } + public static Builder builder() { return new Builder(); } @@ -98,6 +120,7 @@ public static class Builder { private long bufferFlushMaxBytes = DEFAULT_BUFFER_FLUSH_MAX_BYTES; private int maxRetries = DEFAULT_MAX_RETRIES; private Properties streamLoadProp = new Properties(); + private boolean deletable = false; public Builder withFenodes(String fenodes) { this.fenodes = fenodes; @@ -144,6 +167,11 @@ public Builder withMaxRetries(int maxRetries) { return this; } + public Builder withDeletable(boolean deletable) { + this.deletable = deletable; + return this; + } + public DorisOptions build() { Preconditions.checkArgument(fenodes != null, "Fenodes must not be null"); Preconditions.checkArgument(username != null, "Username must not be null"); @@ -153,7 +181,7 @@ public DorisOptions build() { Preconditions.checkArgument(bufferFlushMaxRows >= 10000, "BufferFlushMaxRows must be greater than 10000"); Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 * 1024, "BufferFlushMaxBytes must be greater than 10MB"); Preconditions.checkArgument(maxRetries >= 0, "MaxRetries must be greater than 0"); - return new DorisOptions(fenodes, username, password, database, table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries); + return new DorisOptions(fenodes, username, password, database, table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries, deletable); } } } diff --git a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/serializer/DorisRecordSerializer.java b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/serializer/DorisRecordSerializer.java index 22caa643dc5325..8a40fbbac20d65 100644 --- a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/serializer/DorisRecordSerializer.java +++ b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/serializer/DorisRecordSerializer.java @@ -35,6 +35,7 @@ import java.util.StringJoiner; import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.CSV; +import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.DORIS_DELETE_SIGN; import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.JSON; import static org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.NULL_VALUE; @@ -47,13 +48,15 @@ public class DorisRecordSerializer { private final String fieldDelimiter; private final ValueMetaInterface[] formatMeta; private LogChannelInterface log; + private final boolean deletable; private DorisRecordSerializer( String[] fieldNames, ValueMetaInterface[] formatMeta, String type, String fieldDelimiter, - LogChannelInterface log) { + LogChannelInterface log, + boolean deletable) { this.fieldNames = fieldNames; this.type = type; this.fieldDelimiter = fieldDelimiter; @@ -62,6 +65,7 @@ private DorisRecordSerializer( } this.formatMeta = formatMeta; this.log = log; + this.deletable = deletable; } @@ -89,6 +93,10 @@ public String buildJsonString(Object[] record, int maxIndex) throws IOException, valueMap.put(fieldNames[fieldIndex], value); fieldIndex++; } + if (deletable) { + // All load data will be deleted + valueMap.put(DORIS_DELETE_SIGN, "1"); + } return objectMapper.writeValueAsString(valueMap); } @@ -101,6 +109,10 @@ public String buildCSVString(Object[] record, int maxIndex) throws IOException, joiner.add(value); fieldIndex++; } + if (deletable) { + // All load data will be deleted + joiner.add("1"); + } return joiner.toString(); } @@ -147,6 +159,7 @@ public static class Builder { private String type; private String fieldDelimiter; private LogChannelInterface log; + private boolean deletable; public Builder setFieldNames(String[] fieldNames) { this.fieldNames = fieldNames; @@ -173,6 +186,11 @@ public Builder setLogChannelInterface(LogChannelInterface log) { return this; } + public Builder setDeletable(boolean deletable) { + this.deletable = deletable; + return this; + } + public DorisRecordSerializer build() { Preconditions.checkState( CSV.equals(type) && fieldDelimiter != null @@ -180,7 +198,7 @@ public DorisRecordSerializer build() { Preconditions.checkNotNull(formatMeta); Preconditions.checkNotNull(fieldNames); - return new DorisRecordSerializer(fieldNames, formatMeta, type, fieldDelimiter, log); + return new DorisRecordSerializer(fieldNames, formatMeta, type, fieldDelimiter, log, deletable); } } } diff --git a/extension/kettle/ui/src/main/java/org/pentaho/di/ui/trans/steps/dorisstreamloader/DorisStreamLoaderDialog.java b/extension/kettle/ui/src/main/java/org/pentaho/di/ui/trans/steps/dorisstreamloader/DorisStreamLoaderDialog.java index 41321598304aea..c7ab7c76843aef 100644 --- a/extension/kettle/ui/src/main/java/org/pentaho/di/ui/trans/steps/dorisstreamloader/DorisStreamLoaderDialog.java +++ b/extension/kettle/ui/src/main/java/org/pentaho/di/ui/trans/steps/dorisstreamloader/DorisStreamLoaderDialog.java @@ -72,14 +72,13 @@ * Dialog class for the Doris stream loader step. */ @PluginDialog(id = "DorisStreamLoaderStep", image = "doris.svg", pluginType = PluginDialog.PluginType.STEP, - documentationUrl = "https://doris.apache.org/docs/dev/data-operate/import/import-way/stream-load-manual/") + documentationUrl = "https://doris.apache.org/docs/dev/ecosystem/kettle/") @InjectionSupported(localizationPrefix = "DorisKettleConnector.Injection.", groups = {"FIELDS"}) public class DorisStreamLoaderDialog extends BaseStepDialog implements StepDialogInterface { private static Class PKG = DorisStreamLoaderDialog.class; // for i18n purposes, needed by Translator2!! private DorisStreamLoaderMeta input; - private Label wlFenodes; private TextVar wFenodes; private FormData fdlFenodes, fdFenodes; @@ -117,6 +116,10 @@ public class DorisStreamLoaderDialog extends BaseStepDialog implements StepDialo private TextVar wMaxRetries; private FormData fdlMaxRetries, fdMaxRetries; + private Label wlDeletable; + private Button wDeletable; + private FormData fdlDeletable, fdDeletable; + private Label wlReturn; private TableView wReturn; private FormData fdlReturn, fdReturn; @@ -347,7 +350,7 @@ public void focusLost(FocusEvent focusEvent) { wMaxRetries = new TextVar(transMeta, shell, SWT.SINGLE | SWT.LEFT | SWT.BORDER); props.setLook(wMaxRetries); wMaxRetries.addModifyListener(lsMod); - wPassword.addFocusListener(lsFocusLost); + wMaxRetries.addFocusListener(lsFocusLost); fdMaxRetries = new FormData(); fdMaxRetries.left = new FormAttachment(middle, 0); fdMaxRetries.right = new FormAttachment(100, 0); @@ -374,6 +377,31 @@ public void focusLost(FocusEvent focusEvent) { fdStreamLoadProp.top = new FormAttachment(wMaxRetries, margin * 2); wStreamLoadProp.setLayoutData(fdStreamLoadProp); + //deletable line ... + wlDeletable = new Label(shell, SWT.RIGHT); + wlDeletable.setText(BaseMessages.getString(PKG, "DorisStreamLoaderDialog.Deletable.Label")); + props.setLook(wlDeletable); + fdlDeletable = new FormData(); + fdlDeletable.left = new FormAttachment(0, 0); + fdlDeletable.right = new FormAttachment(middle, -margin); + fdlDeletable.top = new FormAttachment(wStreamLoadProp, margin * 2); + wlDeletable.setLayoutData(fdlDeletable); + + wDeletable = new Button(shell, SWT.CHECK | SWT.LEFT); + props.setLook(wDeletable); + wDeletable.setSelection(false); + fdDeletable = new FormData(); + fdDeletable.left = new FormAttachment(middle, 0); + fdDeletable.right = new FormAttachment(100, 0); + fdDeletable.top = new FormAttachment(wStreamLoadProp, margin * 2); + wDeletable.setLayoutData(fdDeletable); + wDeletable.addSelectionListener(new SelectionAdapter() { + @Override + public void widgetSelected(SelectionEvent selectionEvent) { + input.setChanged(); + } + }); + // OK and cancel buttons wOK = new Button( shell, SWT.PUSH ); wOK.setText( BaseMessages.getString( PKG, "System.Button.OK" ) ); @@ -387,7 +415,7 @@ public void focusLost(FocusEvent focusEvent) { props.setLook(wlReturn); fdlReturn = new FormData(); fdlReturn.left = new FormAttachment(0, 0); - fdlReturn.top = new FormAttachment(wStreamLoadProp, margin); + fdlReturn.top = new FormAttachment(wDeletable, margin); wlReturn.setLayoutData(fdlReturn); int UpInsCols = 2; @@ -537,6 +565,7 @@ private void getData(){ wBufferFlushMaxRows.setText(Const.NVL(String.valueOf(input.getBufferFlushMaxRows()),"50000")); wBufferFlushMaxBytes.setText(Const.NVL(String.valueOf(input.getBufferFlushMaxBytes()),"104857600")); wMaxRetries.setText(Const.NVL(String.valueOf(input.getMaxRetries()),"3")); + wDeletable.setSelection(input.isDeletable()); if (input.getFieldTable() != null) { for (int i = 0; i < input.getFieldTable().length; i++) { @@ -706,6 +735,7 @@ private void getInfo(DorisStreamLoaderMeta inf) { inf.setBufferFlushMaxBytes(Long.valueOf(wBufferFlushMaxBytes.getText())); inf.setMaxRetries(Integer.valueOf(wMaxRetries.getText())); inf.setStreamLoadProp(wStreamLoadProp.getText()); + inf.setDeletable(wDeletable.getSelection()); stepname = wStepname.getText(); diff --git a/extension/kettle/ui/src/main/resources/org/pentaho/di/ui/trans/steps/dorisstreamloader/messages/messages_en_US.properties b/extension/kettle/ui/src/main/resources/org/pentaho/di/ui/trans/steps/dorisstreamloader/messages/messages_en_US.properties index d172f8a824d468..5dcae422fb1172 100644 --- a/extension/kettle/ui/src/main/resources/org/pentaho/di/ui/trans/steps/dorisstreamloader/messages/messages_en_US.properties +++ b/extension/kettle/ui/src/main/resources/org/pentaho/di/ui/trans/steps/dorisstreamloader/messages/messages_en_US.properties @@ -27,6 +27,7 @@ DorisStreamLoaderDialog.StreamLoadProp.Label=StreamLoad Properties DorisStreamLoaderDialog.BufferFlushMaxRows.Label=Maximum rows for load DorisStreamLoaderDialog.BufferFlushMaxBytes.Label=Maximum bytes for load DorisStreamLoaderDialog.MaxRetries.Label=Load retries +DorisStreamLoaderDialog.Deletable.Label=Delete Mode DorisStreamLoaderDialog.Fields.Label=Fields to load\: DorisStreamLoaderDialog.ColumnInfo.TableField=Table field DorisStreamLoaderDialog.ColumnInfo.StreamField=Stream field diff --git a/extension/kettle/ui/src/main/resources/org/pentaho/di/ui/trans/steps/dorisstreamloader/messages/messages_zh_CN.properties b/extension/kettle/ui/src/main/resources/org/pentaho/di/ui/trans/steps/dorisstreamloader/messages/messages_zh_CN.properties index dc978f8d50ea66..f07f27a8c1e889 100644 --- a/extension/kettle/ui/src/main/resources/org/pentaho/di/ui/trans/steps/dorisstreamloader/messages/messages_zh_CN.properties +++ b/extension/kettle/ui/src/main/resources/org/pentaho/di/ui/trans/steps/dorisstreamloader/messages/messages_zh_CN.properties @@ -27,6 +27,7 @@ DorisStreamLoaderDialog.StreamLoadProp.Label=Stream Load\u5c5e\u6027 DorisStreamLoaderDialog.BufferFlushMaxRows.Label=\u5355\u6b21\u5bfc\u5165\u6700\u5927\u884c\u6570 DorisStreamLoaderDialog.BufferFlushMaxBytes.Label=\u5355\u6b21\u5bfc\u5165\u6700\u5927\u5b57\u8282 DorisStreamLoaderDialog.MaxRetries.Label=\u5bfc\u5165\u91cd\u8bd5\u6b21\u6570 +DorisStreamLoaderDialog.Deletable.Label=\u5220\u9664\u6a21\u5f0f DorisStreamLoaderDialog.Fields.Label=\u8981\u52a0\u8f7d\u7684\u5b57\u6bb5\: DorisStreamLoaderDialog.ColumnInfo.TableField=\u8868\u5b57\u6bb5 DorisStreamLoaderDialog.ColumnInfo.StreamField=\u6d41\u5b57\u6bb5