Skip to content

Commit

Permalink
[server] Revert the queue in RequestChannel from PriorityBlockingQueu…
Browse files Browse the repository at this point in the history
…e back to ArrayBlockingQueue to fix config 'netty.server.max-queued-requests' error (#276)
  • Loading branch information
swuferhong authored Jan 16, 2025
1 parent 876043e commit 052492f
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import javax.annotation.concurrent.ThreadSafe;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

/** A blocking queue channel that can receive requests and send responses. */
Expand All @@ -33,18 +33,7 @@ public final class RequestChannel {
private final BlockingQueue<RpcRequest> requestQueue;

public RequestChannel(int queueCapacity) {
this.requestQueue =
new PriorityBlockingQueue<>(
queueCapacity,
(req1, req2) -> {
// less value will be popped first
int res = Integer.compare(req2.getPriority(), req1.getPriority());
// if priority is same, we want to keep FIFO
if (res == 0 && req1 != req2) {
res = (req1.getRequestId() < req2.getRequestId() ? -1 : 1);
}
return res;
});
this.requestQueue = new ArrayBlockingQueue<>(queueCapacity);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package com.alibaba.fluss.rpc.netty.server;

import com.alibaba.fluss.rpc.messages.ApiMessage;
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
import com.alibaba.fluss.rpc.protocol.ApiKeys;
import com.alibaba.fluss.rpc.protocol.ApiMethod;
import com.alibaba.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import com.alibaba.fluss.shaded.netty4.io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -108,23 +106,6 @@ public long getStartTimeMs() {
return startTimeMs;
}

/**
* Get the request priority in {@link RequestChannel}. The higher priority of this request, the
* higher the result will be.
*
* <p>Currently, we only consider the FetchLogRequest from follower as high priority in order to
* make sure the data is replicated to the follower as soon as possible, which can maintain the
* stability of the cluster when the network load is high.
*/
public int getPriority() {
if (apiKey == ApiKeys.FETCH_LOG.id
&& ((FetchLogRequest) message).getFollowerServerId() >= 0) {
return 1;
}

return 0;
}

@Override
public String toString() {
return "RpcRequest{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
public class RequestChannelTest {

@Test
void testRequestPriority() throws Exception {
void testRequestsFIFO() throws Exception {
RequestChannel channel = new RequestChannel(100);

// 1. request with same priority score. Use FIFO.
// 1. Same request type, Use FIFO.
List<RpcRequest> rpcRequests = new ArrayList<>();
// push rpc requests
for (int i = 0; i < 100; i++) {
Expand All @@ -59,7 +59,7 @@ void testRequestPriority() throws Exception {
assertThat(gotRequest).isEqualTo(rpcRequests.get(i));
}

// 2. request with different priority score. Should be ordered by priority score.
// 2. Different request type, Use FIFO.
RpcRequest rpcRequest1 =
new RpcRequest(
ApiKeys.GET_TABLE.id,
Expand All @@ -81,8 +81,8 @@ void testRequestPriority() throws Exception {
channel.putRequest(rpcRequest1);
channel.putRequest(rpcRequest2);
RpcRequest rpcRequest = channel.pollRequest(100);
assertThat(rpcRequest).isEqualTo(rpcRequest2);
rpcRequest = channel.pollRequest(100);
assertThat(rpcRequest).isEqualTo(rpcRequest1);
rpcRequest = channel.pollRequest(100);
assertThat(rpcRequest).isEqualTo(rpcRequest2);
}
}

0 comments on commit 052492f

Please sign in to comment.