Skip to content

Commit

Permalink
Merge pull request #106 from QNJR-GROUP/1.2.x
Browse files Browse the repository at this point in the history
1.2.x

* 增加注解开启Provider的功能
* 对SpringBoot 2.0.x及Spring Cloud F版进行适配
* 增加自动补偿功能(整合阿里Fescar的AT功能)
  • Loading branch information
skyesx authored Feb 17, 2019
2 parents 1678e6e + 52018c0 commit 6d83235
Show file tree
Hide file tree
Showing 118 changed files with 4,061 additions and 496 deletions.
34 changes: 29 additions & 5 deletions easytrans-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
</dependency>

<dependency>
<groupId>com.alibaba.fescar</groupId>
<artifactId>fescar-rm-datasource</artifactId>
<version>0.2.0</version>
</dependency>

<!-- for override a spring class method that imported common-logging -->
<dependency>
Expand All @@ -60,12 +66,30 @@



<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>

<dependency>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ public static class StringCodecKeys{

public final static String EscapeChar = "_";

public static class DataSourceRelative {
public static final String DATA_SOURCE = "RELATIVE_DS";
public static final String TRANSACTION_MANAGER = "RELATIVE_TM";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public AfterTransMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteS

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String AFTER_TRANS_METHOD_NAME = "afterTransaction";

private static final String AFTER_TRANS_METHOD_FUTURE_PREFIX = "ATMF";

Expand Down Expand Up @@ -88,7 +87,7 @@ public boolean onDismatch(final LogProcessContext logCtx, Content leftContent) {
FutureTask<Object> futureTask = new FutureTask<Object>(new Callable<Object>() {
@Override
public Object call() throws Exception {
return rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), afterTransCallContent.getCallSeq(), AFTER_TRANS_METHOD_NAME, afterTransCallContent.getParams(),logCtx);
return rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), afterTransCallContent.getCallSeq(), AfterMasterTransMethod.AFTER_TRANSACTION, afterTransCallContent.getParams(),logCtx);
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.yiqiniu.easytrans.executor;

import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yiqiniu.easytrans.context.LogProcessContext;
import com.yiqiniu.easytrans.context.event.DemiLogEventHandler;
import com.yiqiniu.easytrans.core.EasyTransSynchronizer;
import com.yiqiniu.easytrans.core.LogProcessor;
import com.yiqiniu.easytrans.core.RemoteServiceCaller;
import com.yiqiniu.easytrans.log.vo.Content;
import com.yiqiniu.easytrans.log.vo.fescar.FescarAtCallCommitedContent;
import com.yiqiniu.easytrans.log.vo.fescar.FescarAtCallRollbackedContent;
import com.yiqiniu.easytrans.log.vo.fescar.FescarAtPreCallContent;
import com.yiqiniu.easytrans.protocol.BusinessIdentifer;
import com.yiqiniu.easytrans.protocol.EasyTransRequest;
import com.yiqiniu.easytrans.protocol.autocps.AutoCpsMethod;
import com.yiqiniu.easytrans.util.ReflectUtil;

@RelativeInterface(AutoCpsMethod.class)
public class AutoCpsMethodExecutor implements EasyTransExecutor,LogProcessor,DemiLogEventHandler {

private EasyTransSynchronizer transSynchronizer;
private RemoteServiceCaller rpcClient;

public AutoCpsMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteServiceCaller rpcClient) {
super();
this.transSynchronizer = transSynchronizer;
this.rpcClient = rpcClient;
}

private Logger LOG = LoggerFactory.getLogger(this.getClass());

@Override
public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R extends Serializable> Future<R> execute(final Integer callSeq, final P params) {
final LogProcessContext logProcessContext = transSynchronizer.getLogProcessContext();
Callable<R> callable = new Callable<R>() {
@Override
public R call() throws Exception {
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), callSeq, AutoCpsMethod.DO_AUTO_CPS_BUSINESS, params,logProcessContext);
}
};

FescarAtPreCallContent content = new FescarAtPreCallContent();
content.setParams(params);
content.setCallSeq(callSeq);
return transSynchronizer.executeMethod(callable, content);
}

@Override
public boolean logProcess(LogProcessContext ctx, Content currentContent) {
if(currentContent instanceof FescarAtPreCallContent){
FescarAtPreCallContent preCallContent = (FescarAtPreCallContent) currentContent;
//register DemiLogEvent
ctx.getDemiLogManager().registerSemiLogEventListener(preCallContent, this);
}
return true;
}

@Override
public boolean onMatch(LogProcessContext logCtx, Content leftContent, Content rightContent) {
return true;// do nothig
}

@Override
public boolean onDismatch(LogProcessContext logCtx, Content leftContent) {
FescarAtPreCallContent preCallContent = (FescarAtPreCallContent) leftContent;
EasyTransRequest<?,?> params = preCallContent.getParams();
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
if(logCtx.getFinalMasterTransStatus() == null){
LOG.info("final trans status unknown,process later." + logCtx.getLogCollection());
return false;//unknown,process later
}else if(logCtx.getFinalMasterTransStatus()){
//commit
//execute confirm and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), AutoCpsMethod.DO_AUTO_CPS_COMMIT, preCallContent.getParams(),logCtx);
FescarAtCallCommitedContent committedContent = new FescarAtCallCommitedContent();
committedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(committedContent);
return true;
}else{
//roll back
//execute cancel and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), AutoCpsMethod.DO_AUTO_CPS_ROLLBACK, preCallContent.getParams(),logCtx);
FescarAtCallRollbackedContent rollbackedContent = new FescarAtCallRollbackedContent();
rollbackedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(rollbackedContent);
return true;
}
}

@Override
public boolean preLogProcess(LogProcessContext ctx, Content currentContent) {
return true;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,14 @@ public CompensableMethodExecutor(EasyTransSynchronizer transSynchronizer, Remote

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String COMPENSABLE_BUSINESS_METHOD_NAME = "doCompensableBusiness";
private static final String COMPENSATION_METHOD_NAME = "compensation";

@Override
public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R extends Serializable> Future<R> execute(final Integer sameBusinessCallSeq, final P params) {
final LogProcessContext logProcessContext = transSynchronizer.getLogProcessContext();
Callable<R> callable = new Callable<R>() {
@Override
public R call() throws Exception {
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), sameBusinessCallSeq, COMPENSABLE_BUSINESS_METHOD_NAME, params,logProcessContext);
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), sameBusinessCallSeq, CompensableMethod.DO_COMPENSABLE_BUSINESS, params,logProcessContext);
}
};

Expand Down Expand Up @@ -86,7 +83,7 @@ public boolean onDismatch(LogProcessContext logCtx, Content leftContent) {
}else{
//roll back
//execute compensation and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCpsContent.getCallSeq(), COMPENSATION_METHOD_NAME, preCpsContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCpsContent.getCallSeq(), CompensableMethod.COMPENSATION, preCpsContent.getParams(),logCtx);
CompensatedContent compensatedContent = new CompensatedContent();
compensatedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(compensatedContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ public SagaTccMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteServ

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String TRY_METHOD_NAME = "sagaTry";
private static final String CONFIRM_METHOD_NAME = "sagaConfirm";
private static final String CANCEL_METHOD_NAME = "sagaCancel";



@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -81,7 +76,7 @@ public boolean preLogProcess(LogProcessContext ctx, Content currentContent) {
EasyTransRequest<?, ?> params = sagaLog.getParams();
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
try {
rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), sagaLog.getCallSeq(), TRY_METHOD_NAME, params,ctx);
rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), sagaLog.getCallSeq(), SagaTccMethod.SAGA_TRY, params,ctx);
} catch (Exception e) {
LOG.warn("saga try call failed" + sagaLog,e);
//execute failed, vote to roll back
Expand Down Expand Up @@ -117,15 +112,15 @@ public boolean onDismatch(LogProcessContext logCtx, Content leftContent) {
}else if(logCtx.getFinalMasterTransStatus()){
//commit
//execute confirm and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CONFIRM_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), SagaTccMethod.SAGA_CONFIRM, preCallContent.getParams(),logCtx);
SagaTccCallConfirmedContent tccCallConfirmedContent = new SagaTccCallConfirmedContent();
tccCallConfirmedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(tccCallConfirmedContent);
return true;
}else{
//roll back
//execute cancel and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CANCEL_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), SagaTccMethod.SAGA_CANCEL, preCallContent.getParams(),logCtx);
SagaTccCallCancelledContent tccCallCanceledContent = new SagaTccCallCancelledContent();
tccCallCanceledContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(tccCallCanceledContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,14 @@ public TccMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteServiceC

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String TRY_METHOD_NAME = "doTry";
private static final String CONFIRM_METHOD_NAME = "doConfirm";
private static final String CANCEL_METHOD_NAME = "doCancel";

@Override
public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R extends Serializable> Future<R> execute(final Integer callSeq, final P params) {
final LogProcessContext logProcessContext = transSynchronizer.getLogProcessContext();
Callable<R> callable = new Callable<R>() {
@Override
public R call() throws Exception {
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), callSeq, TRY_METHOD_NAME, params,logProcessContext);
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), callSeq, TccMethod.DO_TRY, params,logProcessContext);
}
};

Expand Down Expand Up @@ -82,15 +78,15 @@ public boolean onDismatch(LogProcessContext logCtx, Content leftContent) {
}else if(logCtx.getFinalMasterTransStatus()){
//commit
//execute confirm and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CONFIRM_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), TccMethod.DO_CONFIRM, preCallContent.getParams(),logCtx);
TccCallConfirmedContent tccCallConfirmedContent = new TccCallConfirmedContent();
tccCallConfirmedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(tccCallConfirmedContent);
return true;
}else{
//roll back
//execute cancel and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CANCEL_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), TccMethod.DO_CANCEL, preCallContent.getParams(),logCtx);
TccCallCancelledContent tccCallCanceledContent = new TccCallCancelledContent();
tccCallCanceledContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(tccCallCanceledContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.sql.DataSource;

import org.springframework.core.annotation.Order;
import org.springframework.transaction.PlatformTransactionManager;

import com.yiqiniu.easytrans.core.EasytransConstant;
import com.yiqiniu.easytrans.datasource.DataSourceSelector;
import com.yiqiniu.easytrans.protocol.EasyTransRequest;
import com.yiqiniu.easytrans.protocol.MethodTransactionStatus;
import com.yiqiniu.easytrans.provider.factory.ListableProviderFactory;
Expand All @@ -16,9 +20,11 @@ public class MetaDataFilter implements EasyTransFilter {

private static ThreadLocal<Map<String, Object>> threadLocalHeader = new ThreadLocal<>();
private ListableProviderFactory providerFactory;
private DataSourceSelector selector;

public MetaDataFilter(ListableProviderFactory providerFactory){
public MetaDataFilter(ListableProviderFactory providerFactory, DataSourceSelector selector){
this.providerFactory = providerFactory;
this.selector = selector;
}

@Override
Expand All @@ -27,14 +33,29 @@ public EasyTransResult invoke(EasyTransFilterChain filterChain, Map<String, Obje

EasyTransResult result = null;
try {

addParentTrxStatusToHeader(header,filterChain,request);
addRelativeDatasourceToHeader(header, filterChain, request);
threadLocalHeader.set(header);

result = filterChain.invokeFilterChain(header, request);
} finally {
threadLocalHeader.remove();
}
return result;
}

private void addRelativeDatasourceToHeader(Map<String, Object> header, EasyTransFilterChain filterChain, EasyTransRequest<?, ?> request) {
DataSource relativeDataSource = selector.selectDataSource(filterChain.getAppId(), filterChain.getBusCode(), request);
if(relativeDataSource != null) {
header.put(EasytransConstant.DataSourceRelative.DATA_SOURCE, relativeDataSource);
}

PlatformTransactionManager relativeTransactionManager = selector.selectTransactionManager(filterChain.getAppId(), filterChain.getBusCode(), request);
if(relativeTransactionManager != null) {
header.put(EasytransConstant.DataSourceRelative.TRANSACTION_MANAGER, relativeTransactionManager);
}
}

private void addParentTrxStatusToHeader(Map<String, Object> header, EasyTransFilterChain filterChain ,EasyTransRequest<?, ?> request) {
int transactionStatus = getTransactionStatus(filterChain.getAppId(),filterChain.getBusCode(),filterChain.getInnerMethodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
import com.yiqiniu.easytrans.core.LogProcessor;
import com.yiqiniu.easytrans.executor.AfterTransMethodExecutor;
import com.yiqiniu.easytrans.executor.CompensableMethodExecutor;
import com.yiqiniu.easytrans.executor.AutoCpsMethodExecutor;
import com.yiqiniu.easytrans.executor.ReliableMessageMethodExecutor;
import com.yiqiniu.easytrans.executor.SagaTccMethodExecutor;
import com.yiqiniu.easytrans.executor.TccMethodExecutor;
import com.yiqiniu.easytrans.log.vo.aft.AfterTransCallRegisterContent;
import com.yiqiniu.easytrans.log.vo.aft.AfterTransCalledContent;
import com.yiqiniu.easytrans.log.vo.compensable.CompensatedContent;
import com.yiqiniu.easytrans.log.vo.compensable.PreCompensableCallContent;
import com.yiqiniu.easytrans.log.vo.fescar.FescarAtCallCommitedContent;
import com.yiqiniu.easytrans.log.vo.fescar.FescarAtCallRollbackedContent;
import com.yiqiniu.easytrans.log.vo.fescar.FescarAtPreCallContent;
import com.yiqiniu.easytrans.log.vo.msg.MessageRecordContent;
import com.yiqiniu.easytrans.log.vo.msg.MessageSentContent;
import com.yiqiniu.easytrans.log.vo.saga.PreSagaTccCallContent;
Expand Down Expand Up @@ -68,6 +72,9 @@ public static enum ContentType{
PreSagaTccCall(12,SagaTccMethodExecutor.class,PreSagaTccCallContent.class),
SagaTccCallConfirmed(13,null,SagaTccCallConfirmedContent.class),
SagaTccCallCanceled(14,null,SagaTccCallCancelledContent.class),
FescarAtPreCall(15,AutoCpsMethodExecutor.class,FescarAtPreCallContent.class),
FescarAtCommited(16,null,FescarAtCallCommitedContent.class),
FescarAtRollbacked(17,null,FescarAtCallRollbackedContent.class),
;

private static HashMap<Integer,ContentType> map = new HashMap<Integer, Content.ContentType>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.yiqiniu.easytrans.log.vo.fescar;

import com.yiqiniu.easytrans.log.vo.AfterCommit;
import com.yiqiniu.easytrans.log.vo.DemiRightContent;

@AfterCommit
public class FescarAtCallCommitedContent extends DemiRightContent {

private static final long serialVersionUID = 1L;

@Override
public int getLogType() {
return ContentType.FescarAtCommited.getContentTypeId();
}
}
Loading

0 comments on commit 6d83235

Please sign in to comment.