Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Sep 11, 2024
1 parent 46d4567 commit 7f08826
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ public class DatabaseSyncConfig {
public static final String SINGLE_SINK = "single-sink";
////////// doris-table-conf //////////
public static final String TABLE_CONF = "table-conf";
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";

////////// date-converter-conf //////////
public static final String CONVERTERS = "converters";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import java.util.Objects;

public class DorisTableConfig implements Serializable {
private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
public static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
// PROPERTIES parameter in doris table creation statement. such as: replication_num=1.
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";

private final Map<String, String> tableProperties;
// The specific parameters extracted from --table-conf need to be parsed and integrated into the
// doris table creation statement. such as: table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50".
Expand All @@ -48,10 +51,9 @@ public DorisTableConfig(Map<String, String> tableConfig) {
if (!tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
tableConfig.put(LIGHT_SCHEMA_CHANGE, Boolean.toString(true));
}
if (tableConfig.containsKey(DatabaseSyncConfig.TABLE_BUCKETS)) {
this.tableBuckets =
buildTableBucketMap(tableConfig.get(DatabaseSyncConfig.TABLE_BUCKETS));
tableConfig.remove(DatabaseSyncConfig.TABLE_BUCKETS);
if (tableConfig.containsKey(TABLE_BUCKETS)) {
this.tableBuckets = buildTableBucketMap(tableConfig.get(TABLE_BUCKETS));
tableConfig.remove(TABLE_BUCKETS);
}
tableProperties = tableConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@

package org.apache.doris.flink.container.e2e;

import org.apache.doris.flink.container.AbstractContainerTestBase;
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import org.apache.doris.flink.container.AbstractE2EService;
import org.apache.doris.flink.container.ContainerUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,19 +34,12 @@
import java.util.List;
import java.util.UUID;

public class Doris2DorisE2ECase extends AbstractE2EService {
public class Doris2DorisE2ECase extends AbstractContainerTestBase {
private static final Logger LOG = LoggerFactory.getLogger(Doris2DorisE2ECase.class);
private static final String DATABASE_SOURCE = "test_doris2doris_source";
private static final String DATABASE_SINK = "test_doris2doris_sink";
private static final String TABLE = "test_tbl";

@Before
public void setUp() throws InterruptedException {
LOG.info("Doris2DorisE2ECase attempting to acquire semaphore.");
SEMAPHORE.acquire();
LOG.info("Doris2DorisE2ECase semaphore acquired.");
}

@Test
public void testDoris2Doris() throws Exception {
LOG.info("Start executing the test case of doris to doris.");
Expand Down Expand Up @@ -163,14 +153,4 @@ private void initializeDorisTable() {
ContainerUtils.executeSQLStatement(getDorisQueryConnection(), LOG, sinkInitSql);
LOG.info("Initialization of doris table successful.");
}

@After
public void close() {
try {
// Ensure that semaphore is always released
} finally {
LOG.info("Doris2DorisE2ECase releasing semaphore.");
SEMAPHORE.release();
}
}
}

0 comments on commit 7f08826

Please sign in to comment.