Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

请教Connection的requestQueue设计问题 #55

Open
iSenninha opened this issue Feb 10, 2019 · 4 comments
Open

请教Connection的requestQueue设计问题 #55

iSenninha opened this issue Feb 10, 2019 · 4 comments

Comments

@iSenninha
Copy link

Connection里的requestQueue队列的设计用意是在网络不通的时候缓存请求,网络通畅的时候再去消费的吧?

生产者代码会在RpcChannel#doTransport调用:

        if (!connection.getFuture().isSuccess()) {
            try {
                connection.produceRequest(state);
            } catch (IllegalStateException e) {
                RpcClientCallState callState = rpcClient.removePendingRequest(correlationId);
                if (callState != null) {
                    callState.handleFailure(e.getMessage());
                    LOG.log(Level.FINE, "id:" + correlationId + " is put in the queue");
                }
            }
        }

消费代码是注册在ChannelPoolObjectFactory#wrap回调里:

    @Override
    public PooledObject<Connection> wrap(Connection connection) {
        InetSocketAddress address;
        if (host == null) {
            address = new InetSocketAddress(port);
        } else {
            address = new InetSocketAddress(host, port);
        }
        ChannelFuture future = this.rpcClient.connect(address);

        // Wait until the connection is made successfully.
        future.awaitUninterruptibly();
        if (!future.isSuccess()) {
            LOGGER.log(Level.SEVERE, "failed to get result from stp", future.cause());
        } else {
            connection.setIsConnected(true);
        }

        future.addListener(new RpcChannelFutureListener(connection));
        connection.setFuture(future);
        
        return new DefaultPooledObject<Connection>(connection);
    }

1.如果是非短连接的情况下,因为缓存池会校验连接是否成功,所以第一步生产者代码都跑不进去,这个理解没问题吧?

public boolean validateObject(PooledObject<Connection> p) {
        Connection c = p.getObject();
        Channel channel = c.getFuture().channel();
        return channel.isOpen() && channel.isActive();

    }

2.那么考虑短连接的情况,断点跟踪,在服务端无法连通的情况下,确实会进入生产者代码,但是,消费者代码真的会跑进回调代码去消费吗?

future.awaitUninterruptibly()会一直等到这个future的结果,也就是拿到了这个连接的结果(连接成功 / 失败),接下去的注册future回调直接就会跑到EventLoop里“同步”执行,如果这个awaitUninterruptibly操作失败了,后续生产者生产缓存请求,但是后续也不会再去激活回调去消费缓存代码了吧?

也就是说:

  • 如果连接是可用的,这个队列不会去缓存请求;
  • 如果这个连接不可用,这个队列是会去缓存请求,但是无法被消费;

所以Connection里的这个任务缓存队列,是否都像是一个多余的设计?或者可能是我没有考虑到其他的情况。

另一方面,如果这个队列设计是work的情况下,用ArrayBlockingQueue,在大量可用连接存在的情况,是否存在浪费内存的嫌疑,能否考虑用LinkedBlockingQueue替代?

以上,谢谢~

@xiemalin
Copy link
Contributor

感谢反馈。
1。 这个设计目标并不单指连接不可用时,而且是在连接繁忙,占用的情况采用异步的方式发送,以提前吞吐能力的。
2。 这个设计只针对长链接的方式才有效。
3。 validateObject是使用者可设置开启或关闭的, 这一块应用的common-pool的实现

@iSenninha
Copy link
Author

感谢反馈。
1。 这个设计目标并不单指连接不可用时,而且是在连接繁忙,占用的情况采用异步的方式发送,以提前吞吐能力的。
2。 这个设计只针对长链接的方式才有效。
3。 validateObject是使用者可设置开启或关闭的, 这一块应用的common-pool的实现

感谢解答!还有点疑问:
1.连接繁忙,占用 是指建立连接的过程繁忙吗?如果是的话

        // Wait until the connection is made successfully.
        future.awaitUninterruptibly();

上面这个是会同步等待到整个连接建立完成/失败吧?

@xiemalin
Copy link
Contributor

不是建立链接过程繁忙,因为我们使用的是连接池的方案,已经建立完成的链接,有可能出现短时不可用的情况,这个影响因素很多

@iSenninha
Copy link
Author

嗯嗯,其实我疑问的根源是,这个队列的消费是注册在future的回调的,这个future是建立连接这个操作的future吧?建立链接完成后出现短时不可用,我们把请求丢在缓存队列中,这个链接恢复后,这个future还会去回调去消费吗?

    @Override
    public PooledObject<Connection> wrap(Connection connection) {
        InetSocketAddress address;
        if (host == null) {
            address = new InetSocketAddress(port);
        } else {
            address = new InetSocketAddress(host, port);
        }
        ChannelFuture future = this.rpcClient.connect(address);

        // Wait until the connection is made successfully.
        future.awaitUninterruptibly();
        if (!future.isSuccess()) {
            LOGGER.log(Level.SEVERE, "failed to get result from stp", future.cause());
        } else {
            connection.setIsConnected(true);
        }

       // 建立网络链接完成后,短时不可用恢复后,这个future还会有回调吗?
        future.addListener(new RpcChannelFutureListener(connection));
        connection.setFuture(future);
        
        return new DefaultPooledObject<Connection>(connection);
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants