Skip to content

Commit

Permalink
feat: add annotation support
Browse files Browse the repository at this point in the history
  • Loading branch information
skyesx committed Feb 17, 2019
1 parent d129ae8 commit 52018c0
Show file tree
Hide file tree
Showing 66 changed files with 1,447 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public AfterTransMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteS

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String AFTER_TRANS_METHOD_NAME = "afterTransaction";

private static final String AFTER_TRANS_METHOD_FUTURE_PREFIX = "ATMF";

Expand Down Expand Up @@ -88,7 +87,7 @@ public boolean onDismatch(final LogProcessContext logCtx, Content leftContent) {
FutureTask<Object> futureTask = new FutureTask<Object>(new Callable<Object>() {
@Override
public Object call() throws Exception {
return rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), afterTransCallContent.getCallSeq(), AFTER_TRANS_METHOD_NAME, afterTransCallContent.getParams(),logCtx);
return rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), afterTransCallContent.getCallSeq(), AfterMasterTransMethod.AFTER_TRANSACTION, afterTransCallContent.getParams(),logCtx);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,31 @@
import com.yiqiniu.easytrans.log.vo.fescar.FescarAtPreCallContent;
import com.yiqiniu.easytrans.protocol.BusinessIdentifer;
import com.yiqiniu.easytrans.protocol.EasyTransRequest;
import com.yiqiniu.easytrans.protocol.fescar.FescarAtMethod;
import com.yiqiniu.easytrans.protocol.autocps.AutoCpsMethod;
import com.yiqiniu.easytrans.util.ReflectUtil;

@RelativeInterface(FescarAtMethod.class)
public class FescarAtMethodExecutor implements EasyTransExecutor,LogProcessor,DemiLogEventHandler {
@RelativeInterface(AutoCpsMethod.class)
public class AutoCpsMethodExecutor implements EasyTransExecutor,LogProcessor,DemiLogEventHandler {

private EasyTransSynchronizer transSynchronizer;
private RemoteServiceCaller rpcClient;

public FescarAtMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteServiceCaller rpcClient) {
public AutoCpsMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteServiceCaller rpcClient) {
super();
this.transSynchronizer = transSynchronizer;
this.rpcClient = rpcClient;
}

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String TRY_METHOD_NAME = "doFescarAtBusiness";
private static final String CONFIRM_METHOD_NAME = "doFescarAtCommit";
private static final String CANCEL_METHOD_NAME = "doFescarAtRollback";

@Override
public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R extends Serializable> Future<R> execute(final Integer callSeq, final P params) {
final LogProcessContext logProcessContext = transSynchronizer.getLogProcessContext();
Callable<R> callable = new Callable<R>() {
@Override
public R call() throws Exception {
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), callSeq, TRY_METHOD_NAME, params,logProcessContext);
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), callSeq, AutoCpsMethod.DO_AUTO_CPS_BUSINESS, params,logProcessContext);
}
};

Expand Down Expand Up @@ -82,15 +78,15 @@ public boolean onDismatch(LogProcessContext logCtx, Content leftContent) {
}else if(logCtx.getFinalMasterTransStatus()){
//commit
//execute confirm and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CONFIRM_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), AutoCpsMethod.DO_AUTO_CPS_COMMIT, preCallContent.getParams(),logCtx);
FescarAtCallCommitedContent committedContent = new FescarAtCallCommitedContent();
committedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(committedContent);
return true;
}else{
//roll back
//execute cancel and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CANCEL_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), AutoCpsMethod.DO_AUTO_CPS_ROLLBACK, preCallContent.getParams(),logCtx);
FescarAtCallRollbackedContent rollbackedContent = new FescarAtCallRollbackedContent();
rollbackedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(rollbackedContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,14 @@ public CompensableMethodExecutor(EasyTransSynchronizer transSynchronizer, Remote

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String COMPENSABLE_BUSINESS_METHOD_NAME = "doCompensableBusiness";
private static final String COMPENSATION_METHOD_NAME = "compensation";

@Override
public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R extends Serializable> Future<R> execute(final Integer sameBusinessCallSeq, final P params) {
final LogProcessContext logProcessContext = transSynchronizer.getLogProcessContext();
Callable<R> callable = new Callable<R>() {
@Override
public R call() throws Exception {
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), sameBusinessCallSeq, COMPENSABLE_BUSINESS_METHOD_NAME, params,logProcessContext);
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), sameBusinessCallSeq, CompensableMethod.DO_COMPENSABLE_BUSINESS, params,logProcessContext);
}
};

Expand Down Expand Up @@ -86,7 +83,7 @@ public boolean onDismatch(LogProcessContext logCtx, Content leftContent) {
}else{
//roll back
//execute compensation and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCpsContent.getCallSeq(), COMPENSATION_METHOD_NAME, preCpsContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCpsContent.getCallSeq(), CompensableMethod.COMPENSATION, preCpsContent.getParams(),logCtx);
CompensatedContent compensatedContent = new CompensatedContent();
compensatedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(compensatedContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ public SagaTccMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteServ

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String TRY_METHOD_NAME = "sagaTry";
private static final String CONFIRM_METHOD_NAME = "sagaConfirm";
private static final String CANCEL_METHOD_NAME = "sagaCancel";



@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -81,7 +76,7 @@ public boolean preLogProcess(LogProcessContext ctx, Content currentContent) {
EasyTransRequest<?, ?> params = sagaLog.getParams();
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
try {
rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), sagaLog.getCallSeq(), TRY_METHOD_NAME, params,ctx);
rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), sagaLog.getCallSeq(), SagaTccMethod.SAGA_TRY, params,ctx);
} catch (Exception e) {
LOG.warn("saga try call failed" + sagaLog,e);
//execute failed, vote to roll back
Expand Down Expand Up @@ -117,15 +112,15 @@ public boolean onDismatch(LogProcessContext logCtx, Content leftContent) {
}else if(logCtx.getFinalMasterTransStatus()){
//commit
//execute confirm and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CONFIRM_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), SagaTccMethod.SAGA_CONFIRM, preCallContent.getParams(),logCtx);
SagaTccCallConfirmedContent tccCallConfirmedContent = new SagaTccCallConfirmedContent();
tccCallConfirmedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(tccCallConfirmedContent);
return true;
}else{
//roll back
//execute cancel and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CANCEL_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), SagaTccMethod.SAGA_CANCEL, preCallContent.getParams(),logCtx);
SagaTccCallCancelledContent tccCallCanceledContent = new SagaTccCallCancelledContent();
tccCallCanceledContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(tccCallCanceledContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,14 @@ public TccMethodExecutor(EasyTransSynchronizer transSynchronizer, RemoteServiceC

private Logger LOG = LoggerFactory.getLogger(this.getClass());

private static final String TRY_METHOD_NAME = "doTry";
private static final String CONFIRM_METHOD_NAME = "doConfirm";
private static final String CANCEL_METHOD_NAME = "doCancel";

@Override
public <P extends EasyTransRequest<R,E>,E extends EasyTransExecutor,R extends Serializable> Future<R> execute(final Integer callSeq, final P params) {
final LogProcessContext logProcessContext = transSynchronizer.getLogProcessContext();
Callable<R> callable = new Callable<R>() {
@Override
public R call() throws Exception {
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(params.getClass());
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), callSeq, TRY_METHOD_NAME, params,logProcessContext);
return (R) rpcClient.call(businessIdentifer.appId(), businessIdentifer.busCode(), callSeq, TccMethod.DO_TRY, params,logProcessContext);
}
};

Expand Down Expand Up @@ -82,15 +78,15 @@ public boolean onDismatch(LogProcessContext logCtx, Content leftContent) {
}else if(logCtx.getFinalMasterTransStatus()){
//commit
//execute confirm and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CONFIRM_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), TccMethod.DO_CONFIRM, preCallContent.getParams(),logCtx);
TccCallConfirmedContent tccCallConfirmedContent = new TccCallConfirmedContent();
tccCallConfirmedContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(tccCallConfirmedContent);
return true;
}else{
//roll back
//execute cancel and then write Log
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), CANCEL_METHOD_NAME, preCallContent.getParams(),logCtx);
rpcClient.callWithNoReturn(businessIdentifer.appId(), businessIdentifer.busCode(), preCallContent.getCallSeq(), TccMethod.DO_CANCEL, preCallContent.getParams(),logCtx);
TccCallCancelledContent tccCallCanceledContent = new TccCallCancelledContent();
tccCallCanceledContent.setLeftDemiConentId(leftContent.getcId());
logCtx.getLogCache().cacheLog(tccCallCanceledContent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import com.yiqiniu.easytrans.core.LogProcessor;
import com.yiqiniu.easytrans.executor.AfterTransMethodExecutor;
import com.yiqiniu.easytrans.executor.CompensableMethodExecutor;
import com.yiqiniu.easytrans.executor.FescarAtMethodExecutor;
import com.yiqiniu.easytrans.executor.AutoCpsMethodExecutor;
import com.yiqiniu.easytrans.executor.ReliableMessageMethodExecutor;
import com.yiqiniu.easytrans.executor.SagaTccMethodExecutor;
import com.yiqiniu.easytrans.executor.TccMethodExecutor;
Expand Down Expand Up @@ -72,7 +72,7 @@ public static enum ContentType{
PreSagaTccCall(12,SagaTccMethodExecutor.class,PreSagaTccCallContent.class),
SagaTccCallConfirmed(13,null,SagaTccCallConfirmedContent.class),
SagaTccCallCanceled(14,null,SagaTccCallCancelledContent.class),
FescarAtPreCall(15,FescarAtMethodExecutor.class,FescarAtPreCallContent.class),
FescarAtPreCall(15,AutoCpsMethodExecutor.class,FescarAtPreCallContent.class),
FescarAtCommited(16,null,FescarAtCallCommitedContent.class),
FescarAtRollbacked(17,null,FescarAtCallRollbackedContent.class),
;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.yiqiniu.easytrans.protocol;

import java.io.Serializable;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import com.yiqiniu.easytrans.executor.EasyTransExecutor;

public abstract class AnnotationBusinessProviderBuilder implements ApplicationContextAware {

private ApplicationContext ctx;

protected ApplicationContext getApplicationContext() {
return ctx;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.ctx = applicationContext;
}


public abstract Class<? extends Annotation> getTargetAnnotation();

public abstract BusinessProvider<?> create(Annotation ann, Object proxyObj, Method targetMethod, Class<?> requestClass, String beanName);

public abstract Class<? extends EasyTransRequest<?, ?>> getActualConfigClass(Annotation ann, Class<?> requestClass);

@SuppressWarnings("rawtypes")
public static class NullEasyTransRequest<R extends Serializable,E extends EasyTransExecutor> implements EasyTransRequest{
private static final long serialVersionUID = 1L;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.yiqiniu.easytrans.protocol;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessorAdapter;

import com.yiqiniu.easytrans.util.ReflectUtil;

public class AnnotationProviderRegister extends InstantiationAwareBeanPostProcessorAdapter {

private Map<Class<? extends Annotation>,AnnotationBusinessProviderBuilder> mapHandler;
private ConfigurableListableBeanFactory beanFactory;

public AnnotationProviderRegister(List<AnnotationBusinessProviderBuilder> listHandler, ConfigurableListableBeanFactory beanFactory) {
this.beanFactory = beanFactory;
mapHandler = new HashMap<>(listHandler.size() * 2);
for(AnnotationBusinessProviderBuilder handler :listHandler) {
mapHandler.put(handler.getTargetAnnotation(), handler);
}
}


@Override
public boolean postProcessAfterInstantiation(Object bean, String beanName) throws BeansException {
if(bean instanceof AnnotationBusinessProviderBuilder) {
return true;
}

Class<?> targetClass = AopUtils.getTargetClass(bean);
for(Method m : targetClass.getMethods()) {
for(Annotation a : m.getAnnotations()) {
AnnotationBusinessProviderBuilder builder = mapHandler.get(a.annotationType());
if(builder != null) {

Class<?>[] parameterTypes = m.getParameterTypes();
if(parameterTypes.length != 1) {
throw new IllegalArgumentException("method should conatins only One parameter, method:" + m);
}

@SuppressWarnings("unchecked")
Class<? extends EasyTransRequest<?, ?>> requestClass = (Class<? extends EasyTransRequest<?, ?>>) parameterTypes[0];

//create and register in bean factory
BusinessProvider<?> provider = builder.create(a, bean, m, requestClass,beanName);

Class<? extends EasyTransRequest<?, ?>> cfgClass = builder.getActualConfigClass(a, requestClass);
BusinessIdentifer businessIdentifer = ReflectUtil.getBusinessIdentifer(cfgClass);

if(businessIdentifer == null) {
throw new RuntimeException("can not find BusinessIdentifer in request class!" + requestClass);
}

beanFactory.registerSingleton(businessIdentifer.appId() + "-" + businessIdentifer.busCode() +"-provider", provider);
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

public interface BusinessProvider<P extends EasyTransRequest<?, ?>>{

public static final String GET_IDEMPOTENT_TYPE = "getIdempotentType";

/**
* Idempotent implement by Framework code<br/>
* this will take extract performance cost,but it will help decrease the complexity of business<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@


public interface MessageBusinessProvider<P extends EasyTransRequest<?, ?>> extends BusinessProvider<P> {
//indicate isSynchronousMethod to be true,to save the return value

public static final String CONSUME = "consume";

//indicate isSynchronousMethod to be true,to save the return value
@ExecuteOrder(doNotExecuteAfter = {}, ifNotExecutedReturnDirectly = {}, isSynchronousMethod=true)
@MethodTransactionStatus(TransactionStatus.COMMITTED)
public EasyTransConsumeAction consume(EasyTransRequest<?, ?> request);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.yiqiniu.easytrans.protocol;

public interface RequestClassAware {

public static final String GET_REQUEST_CLASS = "getRequestClass";

Class<? extends EasyTransRequest<?, ?>> getRequestClass();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* Methods here should be idempotent
*/
public interface AfterMasterTransMethod<P extends AfterMasterTransRequest<R>, R extends Serializable> extends RpcBusinessProvider<P>{
@MethodTransactionStatus(TransactionStatus.COMMITTED)

public static final String AFTER_TRANSACTION = "afterTransaction";

@MethodTransactionStatus(TransactionStatus.COMMITTED)
R afterTransaction(P param);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.yiqiniu.easytrans.protocol.aft;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import com.yiqiniu.easytrans.protocol.AnnotationBusinessProviderBuilder.NullEasyTransRequest;
import com.yiqiniu.easytrans.protocol.EasyTransRequest;

/**
* place in method of the bean that managed by spring
* this method will do the TCC try
* confirmMethod and cancelMethod should in the same class
* @author xudeyou
*
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@SuppressWarnings("rawtypes")
public @interface EtAfterMasterTrans {

/**
* BusinessProvider.IDENPOTENT_TYPE_FRAMEWORK
* BusinessProvider.IDENPOTENT_TYPE_BUSINESS
* @return
*/
int idempotentType();


/**
* 当标注的方法的入参并非继承自EasyTransRequest时,需要用本字段指定继承自该类的类
* @return
*/
Class<? extends EasyTransRequest> cfgClass() default NullEasyTransRequest.class;


}
Loading

0 comments on commit 52018c0

Please sign in to comment.