Skip to content

Commit

Permalink
[pick](branch-3.0) pick #44177 #44286 #44992 #46217 (#46842)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

```
[fix](arrow-flight-sql) Fix conf public_host and arrow_flight_sql_proxy_port #44177
[fix](arrow-flight-sql) Arrow Flight support multiple endpoints #44286
[fix](arrow-flight-sql) Revert arrow_flight_sql group from regression test #44992
[opt](arrow-flight-sql) Support arrow-flight-sql protocol getStreamCatalogs, getStreamSchemas, getStreamTables #46217
```
  • Loading branch information
xinyiZzz authored Jan 13, 2025
1 parent 8bfe07e commit fe1bfe7
Show file tree
Hide file tree
Showing 53 changed files with 767 additions and 216 deletions.
25 changes: 23 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,29 @@ DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_sql_port, "-1");

DEFINE_mString(public_access_ip, "");
DEFINE_Int32(public_access_port, "-1");
// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network.
// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be
// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx.
// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP,
// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP.
DEFINE_mString(public_host, "");

// If the BE node is connected to the external network through a reverse proxy like Nginx
// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy
// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example:
// upstream arrowflight {
// server 10.16.10.8:8069;
// server 10.16.10.8:8068;
//}
// server {
// listen 8167 http2;
// listen [::]:8167 http2;
// server_name doris.arrowflight.com;
// }
DEFINE_Int32(arrow_flight_sql_proxy_port, "-1");

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
28 changes: 23 additions & 5 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,29 @@ DECLARE_Int32(brpc_port);
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// If priority_networks is incorrect but cannot be modified, set public_access_ip as BE’s real IP.
// For ADBC client fetch result, default is empty, the ADBC client uses the backend ip to fetch the result.
// If ADBC client cannot access the backend ip, can set public_access_ip to modify the fetch result ip.
DECLARE_mString(public_access_ip);
DECLARE_Int32(public_access_port);
// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
// 1. in production environment, it is often inconvenient to expose Doris BE nodes to the external network.
// However, a reverse proxy (such as Nginx) can be added to all Doris BE nodes, and the external client will be
// randomly routed to a Doris BE node when connecting to Nginx. set public_host to the host of Nginx.
// 2. if priority_networks is an internal network IP, and BE node has its own independent external IP,
// but Doris currently does not support modifying priority_networks, setting public_host to the real external IP.
DECLARE_mString(public_host);

// If the BE node is connected to the external network through a reverse proxy like Nginx
// and need to use Arrow Flight SQL, should add a server in Nginx to reverse proxy
// `Nginx:arrow_flight_sql_proxy_port` to `BE_priority_networks:arrow_flight_sql_port`. For example:
// upstream arrowflight {
// server 10.16.10.8:8069;
// server 10.16.10.8:8068;
//}
// server {
// listen 8167 http2;
// listen [::]:8167 http2;
// server_name doris.arrowflight.com;
// }
DECLARE_Int32(arrow_flight_sql_proxy_port);

// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
Expand Down
8 changes: 5 additions & 3 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -908,9 +908,11 @@ void PInternalService::fetch_arrow_flight_schema(google::protobuf::RpcController
st = serialize_arrow_schema(&schema, &schema_str);
if (st.ok()) {
result->set_schema(std::move(schema_str));
if (!config::public_access_ip.empty() && config::public_access_port != -1) {
result->set_be_arrow_flight_ip(config::public_access_ip);
result->set_be_arrow_flight_port(config::public_access_port);
if (!config::public_host.empty()) {
result->set_be_arrow_flight_ip(config::public_host);
}
if (config::arrow_flight_sql_proxy_port != -1) {
result->set_be_arrow_flight_port(config::arrow_flight_sql_proxy_port);
}
}
st.to_protobuf(result->mutable_status());
Expand Down
40 changes: 8 additions & 32 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
Expand Down Expand Up @@ -61,11 +60,11 @@
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
Expand Down Expand Up @@ -131,10 +130,7 @@ public enum ConnectType {
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList();
private boolean returnResultFromLocal = true;
// mysql net
protected volatile MysqlChannel mysqlChannel;
Expand Down Expand Up @@ -702,36 +698,16 @@ public String getRunningQuery() {
return runningQuery;
}

public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) {
this.resultFlightServerAddr = resultFlightServerAddr;
public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) {
this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
}

public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
return flightSqlEndpointsLocations;
}

public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) {
this.resultInternalServiceAddr = resultInternalServiceAddr;
}

public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}

public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
this.resultOutputExprs = resultOutputExprs;
}

public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}

public void setFinstId(TUniqueId finstId) {
this.finstId = finstId;
}

public TUniqueId getFinstId() {
return finstId;
public void clearFlightSqlEndpointsLocations() {
flightSqlEndpointsLocations.clear();
}

public void setReturnResultFromLocal(boolean returnResultFromLocal) {
Expand Down
32 changes: 16 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
Expand Down Expand Up @@ -731,29 +732,27 @@ private void execInternal() throws Exception {
enableParallelResultSink = queryOptions.isEnableParallelOutfile();
}

TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
Set<TNetworkAddress> addrs = new HashSet<>();
for (FInstanceExecParam param : topParams.instanceExecParams) {
if (addrs.contains(param.host)) {
continue;
}
addrs.add(param.host);
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
}

if (!context.isReturnResultFromLocal()) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
if (enableParallelResultSink) {
context.setFinstId(queryId);
if (context.isReturnResultFromLocal()) {
receivers.add(new ResultReceiver(queryId, param.instanceId, addressToBackendID.get(param.host),
toBrpcHost(param.host), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver(), enableParallelResultSink));
} else {
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
TUniqueId finstId;
if (enableParallelResultSink) {
finstId = queryId;
} else {
finstId = topParams.instanceExecParams.get(0).instanceId;
}
context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
toArrowFlightHost(param.host), toBrpcHost(param.host), fragments.get(0).getOutputExprs()));
}
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
}

LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
Expand All @@ -764,7 +763,8 @@ private void execInternal() throws Exception {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
.getBroker(topResultFileSink.getBrokerName(),
topParams.instanceExecParams.get(0).host.getHostname());
topResultFileSink.setBrokerAddr(broker.host, broker.port);
}
} else {
Expand Down
Loading

0 comments on commit fe1bfe7

Please sign in to comment.