Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Apr 17, 2024
1 parent 1f46522 commit 009fd53
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void eval(Object... keys) {
Row keyRow = Row.of(keys);
List<RowData> curList = cacheMap.get(keyRow);
if (curList != null) {
curList.parallelStream().forEach(this::collect);
curList.forEach(this::collect);
}
}

Expand All @@ -96,13 +97,14 @@ private void reloadData() {
LOG.info("Populating lookup join cache");
}
cacheMap.clear();

StringBuilder sqlSb = new StringBuilder("select * from ");
sqlSb.append("`").append(sourceOptions.getDatabaseName()).append("`");
sqlSb.append(".");
sqlSb.append("`" + sourceOptions.getTableName() + "`");
LOG.info("LookUpFunction SQL [{}]", sqlSb.toString());
this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, sqlSb.toString());

String columns = Arrays.stream(selectColumns)
.map(col -> "`" + col.getColumnName() + "`")
.collect(Collectors.joining(","));
String sql = String.format("select %s from `%s`.`%s`", columns,
sourceOptions.getDatabaseName(), sourceOptions.getTableName());
LOG.info("LookUpFunction SQL [{}]", sql);
this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, sql);
List<List<QueryBeXTablets>> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo);
cacheMap = lists.get(0).parallelStream()
.flatMap(beXTablets -> scanBeTablets(beXTablets).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private String genSQL(StarRocksSourceQueryType queryType, SelectColumn[] selectC
case QueryAllColumns:
case QuerySomeColumns:
String columns = Arrays.stream(selectColumns)
.map(SelectColumn::getColumnName)
.map(col -> "`" + col.getColumnName() + "`")
.collect(Collectors.joining(","));
sqlSb.append(columns);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package com.starrocks.connector.flink.it.source;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;

import com.alibaba.fastjson.JSONObject;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
import com.starrocks.connector.flink.table.source.StarrocksExternalServiceImpl;
Expand All @@ -26,8 +29,6 @@
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;
import com.starrocks.shade.org.apache.thrift.transport.TTransportFactory;
import com.starrocks.thrift.TStarrocksExternalService;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.junit.After;
import org.junit.Before;

Expand All @@ -38,8 +39,10 @@
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;

Expand All @@ -66,7 +69,6 @@ public abstract class StarRocksSourceBaseTest {
private ServerSocket thriftSocket;
protected int AVAILABLE_THRIFT_PORT = 21592;
protected String mockResonse = "";
protected String querySQL = "select * from `test`.`test_source`";
protected int tabletCount = 50;

@Before
Expand Down Expand Up @@ -174,7 +176,9 @@ public void run() {
sb.append((char) bd.read());
}
}
assertEquals("{\"sql\":\""+ querySQL +"\"}", sb.toString());


assertEquals("{\"sql\":\"" + getQuerySql() + "\"}", sb.toString());
PrintWriter pw = new PrintWriter(socket.getOutputStream());
pw.println("HTTP/1.1 200 OK");
pw.println("Content-type:application/json");
Expand Down Expand Up @@ -209,6 +213,12 @@ public void createThriftServer() {
thriftThread.start();
}

protected String getQuerySql() {
return String.format("select %s from `test`.`test_source`",
Arrays.stream(TABLE_SCHEMA.getFieldNames()).map(col -> "`" + col + "`")
.collect(Collectors.joining(",")));
}

protected void mockResonsefunc() {
String[] beNode = new String[]{
"127.0.0.1:" + AVAILABLE_THRIFT_PORT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class StarRocksQueryPlanVisitorTest extends StarRocksSourceBaseTest {
public void testGetQueryPlan() throws IOException {
mockResonsefunc();
StarRocksQueryPlanVisitor visitor = new StarRocksQueryPlanVisitor(OPTIONS);
QueryInfo queryInfo = visitor.getQueryInfo(querySQL);
QueryInfo queryInfo = visitor.getQueryInfo(getQuerySql());
List<Long> tabletsList = new ArrayList<>();
List<Integer> countList = new ArrayList<>();
queryInfo.getBeXTablets().forEach(beXTablets -> {
Expand Down

0 comments on commit 009fd53

Please sign in to comment.