Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatible with CompleteFuture asynchronous programming #1323

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
19cb982
feat[Future]:compatible java1.8 CompletableFuture
Apr 4, 2023
f5eb764
feat[Future]:compatible java1.8 CompletableFuture
Apr 4, 2023
682ac40
feat[Future]:compatible java1.8 CompletableFuture
Apr 4, 2023
6fe8c4a
feat[Future]:compatible java1.8 CompletableFuture
Apr 4, 2023
964db3c
feat[Future]:compatible java1.8 CompletableFuture
Apr 4, 2023
7badd31
Update maven.yml
dajitui Apr 4, 2023
e1d0a9b
feat[Future]:格式化回退
Apr 4, 2023
6856188
Merge remote-tracking branch 'origin/master'
Apr 4, 2023
b7944a2
Update maven.yml
dajitui Apr 4, 2023
87311b3
feat[Future]:格式化回退
Apr 4, 2023
8c06b4e
feat[Future]:格式化回退
Apr 4, 2023
aa9a1be
feat[Future]:新增测试demo
Apr 4, 2023
f8250df
feat[Future]:回滚
Apr 4, 2023
0a1b407
feat[Future]:回滚
Apr 5, 2023
4d13a3e
feat[Future]:新增测试
Apr 5, 2023
34c4c09
feat[Future]:新增日志
Apr 5, 2023
adf85aa
feat[Future]:新增单测
Apr 5, 2023
f7cda08
feat[Future]:files end with a newline
Apr 5, 2023
8efc624
feat[Future]:新增单测
Apr 6, 2023
34025ee
feat[Future]:最后一行新增换行
Apr 6, 2023
255393c
feat[Future]:删除日志
Apr 6, 2023
4452ae1
feat[Future]:新增Logger类
Apr 6, 2023
9456932
feat[Future]:新增Logger类
Apr 6, 2023
63478bc
feat[Future]:新增Http2ClearTextHessianTest单测
Apr 6, 2023
57a1ecc
feat[Future]:回滚ResponseFuture为继承关系
Apr 12, 2023
db14130
fix[Future]:修复单测
Apr 12, 2023
b85bb7c
fix[Future]:修复HystrixResponseFuture
Apr 12, 2023
9c793f5
fix[Future]:修复HystrixResponseFuture
Apr 12, 2023
6800b15
fix[Future]:修复HystrixResponseFuture
Apr 12, 2023
dfdc8ec
Merge branch 'sofastack:master' into master
dajitui Apr 12, 2023
e7049d6
Merge branch 'sofastack:master' into master
dajitui May 5, 2023
88602af
fix[ResponseFuture]:回退泛型返回type
May 5, 2023
593578e
fix[ResponseFuture]:回滚泛型
May 6, 2023
cd7d416
Update AbstractResponseFuture.java
dajitui May 6, 2023
3ca6ee8
feat[兼容completableFuture]:兼容completableFuture 请求完成、异常的处理
May 24, 2023
09a51f7
Merge remote-tracking branch 'origin/master'
May 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

import com.alipay.sofa.rpc.context.RpcRuntimeContext;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;

/**
* @author <a href="mailto:[email protected]">GengZhang</a>
* @since 5.4.0
*/
public abstract class AbstractResponseFuture<V> implements ResponseFuture<V> {
public abstract class AbstractResponseFuture<V> extends CompletableFuture<V> implements ResponseFuture<V> {

protected static final CancellationException CANCELLATION_CAUSE = new CancellationException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;

/**
* 响应Future,可以调用get方法进行获取响应,也可以注入监听器,有结果或者都会通知
*
* @author <a href=mailto:[email protected]>GengZhang</a>
*/
public interface ResponseFuture<V> extends Future<V> {
public interface ResponseFuture<V> extends Future<V>, CompletionStage<V> {

/**
* 增加多个响应监听器
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alipay.sofa.rpc.client.ProviderHelper;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.message.AbstractResponseFuture;
import com.alipay.sofa.rpc.message.ResponseFuture;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -136,48 +137,69 @@ public void testAttachment() {
Assert.assertNull(context.removeAttachment("11"));
}

@Test
public void testClear() {
RpcInternalContext context = RpcInternalContext.getContext();
context.setRemoteAddress("127.0.0.1", 1234);
context.setLocalAddress("127.0.0.1", 2345);
context.setFuture(new ResponseFuture<String>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
private static class MyResponseFuture extends AbstractResponseFuture<String> {

@Override
public boolean isCancelled() {
return false;
}
/**
* 构造函数
*
* @param timeout
*/
public MyResponseFuture(int timeout) {
super(timeout);
}

@Override
public boolean isDone() {
return false;
}
@Override
public ResponseFuture addListeners(List<SofaResponseCallback> sofaResponseCallbacks) {
return null;
}

@Override
public String get() throws InterruptedException, ExecutionException {
return null;
}
@Override
public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
return null;
}

@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return null;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public ResponseFuture addListeners(List<SofaResponseCallback> sofaResponseCallbacks) {
return null;
}
@Override
public boolean isCancelled() {
return false;
}

@Override
public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
return null;
}
});
@Override
public boolean isDone() {
return false;
}

@Override
public void notifyListeners() {

}

@Override
public String get() throws InterruptedException, ExecutionException {
return null;
}

@Override
protected String getNow() throws ExecutionException {
return null;
}

@Override
protected void releaseIfNeed(Object result) {

}
}

@Test
public void testClear() {
RpcInternalContext context = RpcInternalContext.getContext();
context.setRemoteAddress("127.0.0.1", 1234);
context.setLocalAddress("127.0.0.1", 2345);
context.setFuture(new MyResponseFuture(1));

context.setProviderInfo(ProviderHelper.toProviderInfo("127.0.0.1:80"));
context.setAttachment("_xxxx", "yyyy");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@
import com.netflix.hystrix.HystrixCommand;

import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;

/**
* the {@link Future}(from {@link HystrixCommand#queue()}) wrapper that can be used as a {@link ResponseFuture}
*
* @author <a href=mailto:[email protected]>ScienJus</a>
*/
public class HystrixResponseFuture implements ResponseFuture {
public class HystrixResponseFuture extends CompletableFuture implements ResponseFuture {

private Future delegate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.RequestBase;
import com.alipay.sofa.rpc.message.AbstractResponseFuture;
import com.alipay.sofa.rpc.message.ResponseFuture;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -231,49 +232,81 @@ public void testRegulationEffective() throws InterruptedException {

}

private static class MyResponseFuture extends AbstractResponseFuture<String> {

/**
* 构造函数
*
* @param timeout
*/
public MyResponseFuture(int timeout) {
super(timeout);
}

@Override
public ResponseFuture addListeners(List<SofaResponseCallback> sofaResponseCallbacks) {
return null;
}

@Override
public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
return null;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return false;
}

@Override
public void notifyListeners() {

}

@Override
public String get() throws InterruptedException, ExecutionException {
return null;
}

@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return null;
}

@Override
protected String getNow() throws ExecutionException {
return null;
}

@Override
protected void releaseIfNeed(Object result) {

}
}

private void prepareInvokeContext() {
final RpcInvokeContext context = new RpcInvokeContext();
context.setResponseCallback(new SofaResponseCallback() {
@Override
public void onAppResponse(final Object appResponse, String methodName, RequestBase request) {
//放到 future 中方便测试.
LOGGER.info("回调成功" + appResponse);
context.setFuture(new ResponseFuture<String>() {
@Override
public ResponseFuture addListeners(List<SofaResponseCallback> sofaResponseCallbacks) {
return null;
}

@Override
public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
return null;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return false;
}

context.setFuture(new MyResponseFuture(1) {
@Override
public String get() throws InterruptedException, ExecutionException {
return (String) appResponse;
}

@Override
public String get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException {
return null;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void testAll() {

// 链路异步化调用--正常
RpcInvokeContext.getContext().setResponseCallback(buildCallback(ret, latch));

String ret0 = asyncHelloService.sayHello("xxx", 22);
Assert.assertNull(ret0); // 第一次返回null
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.filter.Filter;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.test.ActivelyDestroyTest;
import com.alipay.sofa.rpc.test.HelloService;
import com.alipay.sofa.rpc.test.HelloServiceImpl;
import com.alipay.sofa.rpc.triple.TripleHessianInvokeTest;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -44,6 +47,8 @@
*/
public class FutureTest extends ActivelyDestroyTest {

private static final Logger LOGGER = LoggerFactory.getLogger(FutureTest.class);

@Test
public void testAll() {

Expand Down Expand Up @@ -152,6 +157,10 @@ public void testAll() {
String ret = helloService.sayHello("xxx", 22);
Assert.assertNull(ret); // 第一次返回null

RpcInvokeContext.getContext().getFuture().thenAccept(req->{
LOGGER.info("FutureTest RpcInvokeContext CompletableFuture result: {}", req);
});

Thread.sleep(1500); // 1s 过去,被rpc设置超时了
Future future = SofaResponseFuture.getFuture();
ret = (String) future.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.RequestBase;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.message.ResponseFuture;
import com.alipay.sofa.rpc.server.bolt.pb.EchoRequest;
import com.alipay.sofa.rpc.server.bolt.pb.Group;
Expand All @@ -43,6 +45,8 @@
*/
public class Http2ClearTextHessianTest extends ActivelyDestroyTest {

private static final Logger LOGGER = LoggerFactory.getLogger(Http2ClearTextHessianTest.class);

@Test
public void testHessian() {
// 只有1个线程 执行
Expand Down Expand Up @@ -116,6 +120,9 @@ public void testHessian() {
ResponseFuture<ExampleObj> future = RpcInvokeContext.getContext().getFuture();
try {
response = future.get();
RpcInvokeContext.getContext().getFuture().thenAccept(req->{
LOGGER.info("Http2ClearTextHessianTest CompletableFuture result: {}", req);
});
Assert.assertEquals(200, response.getId());
Assert.assertEquals("yyyxx", response.getName());
} catch (Exception e) {
Expand Down